jenkins-bot has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/403334 )

Change subject: Convert to pre-built folds in binary files
......................................................................


Convert to pre-built folds in binary files

I've been suspicious of our excessive memory usage and think
shuffling the data through spark and it's various caching
solutions may be causing part of our memory usage.

Try a different way around by adding a new utility script, make_folds,
which splits the data up into CV folds and writes out all the
partitions in xgboost binary format. At training time these files
are copied from HDFS to a temporary file on the local machine so
xgboost can read it. Future updates could perhaps address tracking
the local datasets available, rather than re-transfering the dataset
on each model.

Change-Id: Ic8a1d4f405bcd1ad07dd53ef392f47d8dfa89246
---
M .gitignore
M example_train.yaml
M jvm/src/main/scala/org/wikimedia/search/mjolnir/DataWriter.scala
M jvm/src/main/scala/org/wikimedia/search/mjolnir/MlrXGBoost.scala
M jvm/src/test/scala/org/wikimedia/search/mjolnir/DBNSuite.scala
M jvm/src/test/scala/org/wikimedia/search/mjolnir/DataWriterSuite.scala
M mjolnir/test/conftest.py
A mjolnir/test/fixtures/datasets/test.xgb
A mjolnir/test/fixtures/datasets/test.xgb.query
A mjolnir/test/fixtures/datasets/train.xgb
A mjolnir/test/fixtures/datasets/train.xgb.query
M mjolnir/test/fixtures/load_config/example_train.expect
A mjolnir/test/test_utils.py
M mjolnir/test/training/test_hyperopt.py
M mjolnir/test/training/test_tuning.py
M mjolnir/test/training/test_xgboost.py
M mjolnir/training/hyperopt.py
M mjolnir/training/tuning.py
M mjolnir/training/xgboost.py
A mjolnir/utilities/make_folds.py
M mjolnir/utilities/spark.py
M mjolnir/utilities/training_pipeline.py
A mjolnir/utils.py
M setup.py
24 files changed, 1,112 insertions(+), 442 deletions(-)

Approvals:
  jenkins-bot: Verified
  DCausse: Looks good to me, approved



diff --git a/.gitignore b/.gitignore
index 83930f6..ca06e47 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@
 *.py[cod]
 *~
 /jvm/target
+/jvm/mjolnir.iml
 
 # Distribution / packaging
 venv/
diff --git a/example_train.yaml b/example_train.yaml
index 7b1749c..36abc70 100644
--- a/example_train.yaml
+++ b/example_train.yaml
@@ -20,6 +20,10 @@
         hdfs_training_data_path: 
"hdfs://analytics-hadoop/%(training_data_path)s"
         # Fully qualified local path to the training data
         local_training_data_path: "/mnt/hdfs/%(training_data_path)s"
+        # Fully qualified HDFS path to pre-folded data
+        fold_data_path: 
"user/%(USER)s/mjolnir/%(marker)s-folds_%(profile_name)s"
+        hdfs_fold_data_path: "hdfs://analytics-hadoop/%(fold_data_path)s"
+        local_fold_data_path: "/mnt/hdfs/%(training_data_path)s"
         # Base directory used to build path to write training output to
         base_training_output_dir: "%(HOME)s/training_size"
         # Number of cpu cores to assign per task. Must be a multiple of
@@ -60,7 +64,7 @@
         executor-memory: "%(executor_memory)s"
         # Source our jvm dependencies from archiva. 
         repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
-        packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:%(spark_version)s
+        packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:%(spark_version)s
     spark_conf:
         spark.task.cpus: "%(cores_per_task)s"
         spark.yarn.executor.memoryOverhead: "%(executor_memory_overhead)s"
@@ -76,7 +80,6 @@
             template_vars:
                 cores_per_executor: 4
                 cores_per_task: 4
-                executor_memory: 2G
                 executor_memory_overhead: 6144
         data_pipeline:
             spark_command: "%(SPARK_HOME)s/bin/spark-submit"
@@ -93,6 +96,19 @@
                 samples-per-wiki: 35000000
                 search-cluster: codfw
                 min-sessions: "%(min_sessions_per_query)s"
+        make_folds:
+            spark_command: "%(SPARK_HOME)s/bin/spark-submit"
+            mjolnir_utility_path: "%(mjolnir_utility_path)s"
+            mjolnir_utility: make_folds
+            spark_conf:
+                spark.locality.wait: 0
+                spark.dynamicAllocation.maxExecutors: 100
+            cmd_args:
+                input: "%(hdfs_training_data_path)s"
+                output-dir: "%(hdfs_fold_data_path)s"
+                num-folds: 5
+                num-workers: 1
+                max-executors: 50
         training_pipeline:
             spark_command: "%(SPARK_HOME)s/bin/spark-submit"
             mjolnir_utility_path: "%(mjolnir_utility_path)s"
@@ -106,38 +122,42 @@
             spark_args:
                 driver-memory: 3G
             spark_conf:
-                # Disabling auto broadcast join prevents memory explosion when 
spark
-                # mis-predicts the size of a dataframe. (where does this 
happen?)
-                spark.sql.autoBroadcastJoinThreshold: -1
                 # Adjusting up executor idle timeout from 60s to 180s is a bit 
greedy,
                 # but prevents a whole bunch of log spam from spark killing 
executors
                 # between CV runs
                 spark.dynamicAllocation.executorIdleTimeout: 180s
             cmd_args:
-                input: "%(hdfs_training_data_path)s"
-                output: 
"%(base_training_output_dir)s/%(marker)s_%(profile_name)s"
+                input: "%(hdfs_fold_data_path)s"
+                output: 
"%(base_training_output_dir)s/%(marker)s-folds_%(profile_name)s"
 
 # Individual training groups
 profiles:
     large:
-        # 12M to 30M observations. 4M to 12M per executor.
-        # Approximately 63 executors, 378 cores, 838GB memory
+        # 12M to 30M observations.
+        # Approximately 80 executors, 480 cores, 480GB memory
+        # 8-11 min per enwiki model @ 6 cores = 48 to 66 cpu minutes
+        # 2h40min to final model training
         wikis:
             - enwiki
             - dewiki
         commands:
+            make_folds:
+                template_vars:
+                    # enough space to load datasets into c++
+                    executor_memory_overhead: 5120
+                cmd_args:
+                    num-folds: 3
             training_pipeline:
                 template_vars:
                     cores_per_executor: 6
                     cores_per_task: 6
-                    executor_memory: 4G
-                    executor_memory_overhead: 9216
+                    #  4096 fails with matrix of 350-400M floats
+                    # cv-test-dcg@10: 0.8619
+                    executor_memory_overhead: 6144
                 spark_conf:
-                    spark.dynamicAllocation.maxExecutors: 65
+                    spark.dynamicAllocation.maxExecutors: 85
                 cmd_args:
-                    workers: 3
-                    cv-jobs: 22
-                    folds: 3
+                    cv-jobs: 80
                     final-trees: 100
 
     medium:
@@ -149,18 +169,19 @@
             - frwiki
             - ruwiki
         commands:
+            make_folds:
+                template_vars:
+                    # enough space to load datasets into c++
+                    executor_memory_overhead: 1536
             training_pipeline:
                 template_vars:
-                    cores_per_executor: 6
-                    cores_per_task: 6
-                    executor_memory: 3G
-                    executor_memory_overhead: 9216
+                    cores_per_executor: 4
+                    cores_per_task: 4
+                    executor_memory_overhead: 2048
                 spark_conf:
-                    spark.dynamicAllocation.maxExecutors: 75
+                    spark.dynamicAllocation.maxExecutors: 130
                 cmd_args:
-                    workers: 1
-                    cv-jobs: 70
-                    folds: 5
+                    cv-jobs: 125
                     final-trees: 100
 
     small:
@@ -177,21 +198,21 @@
             - fiwiki
             - jawiki
             - arwiki
-            - itwiki
             - nlwiki
             - zhwiki
             - plwiki
         commands:
+            make_folds:
+                template_vars:
+                    # enough space to load datasets into c++
+                    executor_memory_overhead: 1024
             training_pipeline:
                 template_vars:
                     cores_per_executor: 4
                     cores_per_task: 4
-                    executor_memory: 2G
-                    executor_memory_overhead: 6144
+                    executor_memory_overhead: 1024
                 spark_conf:
-                    spark.dynamicAllocation.maxExecutors: 105
+                    spark.dynamicAllocation.maxExecutors: 130
                 cmd_args:
-                    workers: 1
-                    cv-jobs: 100
-                    folds: 5
+                    cv-jobs: 125
                     final-trees: 500
diff --git a/jvm/src/main/scala/org/wikimedia/search/mjolnir/DataWriter.scala 
b/jvm/src/main/scala/org/wikimedia/search/mjolnir/DataWriter.scala
index dfebed2..8d6976b 100644
--- a/jvm/src/main/scala/org/wikimedia/search/mjolnir/DataWriter.scala
+++ b/jvm/src/main/scala/org/wikimedia/search/mjolnir/DataWriter.scala
@@ -138,7 +138,8 @@
 
   /**
     * @param df         Output from data_pipeline. Must have be repartitioned 
on query and sorted by query
-    *                   within partitions.
+    *                   within partitions. This should have a large number of 
partitions or later coalesce
+    *                   will be imbalanced.
     * @param numWorkers The number of partitions each data file will be 
emitted as
     * @param pathFormat Format for hdfs paths. Params are %s: name, %s: fold, 
%d: partition
     * @param foldCol    Long column to source which fold a row belongs to
@@ -146,13 +147,15 @@
     *         from split name to hdfs path for that partition.
     */
   def write(
-             df: DataFrame,
-             numWorkers: Int,
-             pathFormat: String,
-             foldCol: Option[String]
-           ): Array[Array[Map[String, String]]] = {
+    df: DataFrame,
+    numWorkers: Int,
+    pathFormat: String,
+    foldCol: Option[String]
+  ): Array[Array[Map[String, String]]] = {
     val rdd = df.rdd.mapPartitions(formatPartition(df.schema, foldCol))
-
+    if (rdd.getNumPartitions < numWorkers) {
+      throw new java.io.IOException("Cannot have fewer partitions in input 
than output")
+    }
     val numFolds = foldCol.map { name => 
df.schema(name).metadata.getLong("num_folds").toInt }.getOrElse(1)
 
     try {
diff --git a/jvm/src/main/scala/org/wikimedia/search/mjolnir/MlrXGBoost.scala 
b/jvm/src/main/scala/org/wikimedia/search/mjolnir/MlrXGBoost.scala
index 1a2bebf..2e9f7b4 100644
--- a/jvm/src/main/scala/org/wikimedia/search/mjolnir/MlrXGBoost.scala
+++ b/jvm/src/main/scala/org/wikimedia/search/mjolnir/MlrXGBoost.scala
@@ -46,14 +46,14 @@
   }
 
   private[mjolnir] def buildDistributedBoosters(
-      rdds: RDD[Map[String, String]],
+      rdd: RDD[Map[String, String]],
       trainMatrix: String,
       params: Map[String, Any],
       rabitEnv: Option[java.util.Map[String, String]],
       numRounds: Int,
       earlyStoppingRound: Int = 0
   ): RDD[(Array[Byte], Map[String, Array[Float]])] =
-    rdds.mapPartitions({ rows=>
+    rdd.mapPartitions({ rows=>
       // XGBoost refuses to load our binary format if rabit has been
       // initialized, so we do it early. This make the odd situation
       // where we need to dispose of them before rabit is shutdown.
diff --git a/jvm/src/test/scala/org/wikimedia/search/mjolnir/DBNSuite.scala 
b/jvm/src/test/scala/org/wikimedia/search/mjolnir/DBNSuite.scala
index 2ee6ed6..2a6f7b7 100644
--- a/jvm/src/test/scala/org/wikimedia/search/mjolnir/DBNSuite.scala
+++ b/jvm/src/test/scala/org/wikimedia/search/mjolnir/DBNSuite.scala
@@ -159,9 +159,11 @@
     assert(math.abs(sessionEstimate.s.sum - sessionEstimate.s(0) - 
sessionEstimate.s(10)) < 0.0001D)
   }
 
+  private val FULL_BENCHMARK = false
+
   // Takes ~1.5s on my laptop versus 90 seconds in python
   test("basic benchmark") {
-    val nQueries = 5000
+    val nQueries = if (FULL_BENCHMARK) 5000 else 100
     val nSessionsPerQuery = 20
     val nIterations = 40
     val nResultsPerQuery = 20
diff --git 
a/jvm/src/test/scala/org/wikimedia/search/mjolnir/DataWriterSuite.scala 
b/jvm/src/test/scala/org/wikimedia/search/mjolnir/DataWriterSuite.scala
index 4a17e78..ca85260 100644
--- a/jvm/src/test/scala/org/wikimedia/search/mjolnir/DataWriterSuite.scala
+++ b/jvm/src/test/scala/org/wikimedia/search/mjolnir/DataWriterSuite.scala
@@ -26,6 +26,10 @@
       .withColumn("features", randVec(5)())
       .withColumn("fold", randLong(numFolds)().as("fold", meta))
       .withColumn("query", randString(queries)())
+      // Must have more partitions than numFolds above
+      // or coalesce won't do anything.
+      .repartition(numFolds * 2, F.col("query"))
+      .sortWithinPartitions("query")
   }
 
   test("Write out various standard fold configs as text files") {
@@ -42,7 +46,7 @@
         val pattern = s"$testDir/%s-fold-%s-partition-%d"
         val writer = new DataWriter(spark.sparkContext)
         val folds = writer.write(df, numWorkers, pattern, foldCol)
-        // We only wrote an "all" fold, so top level should have a single item
+
         assert(folds.length == expectedFolds)
         folds.foreach { fold =>
           assert(fold.length == numWorkers)
@@ -52,6 +56,7 @@
             // Items expected but not in partition
             assert(expectedSplits.diff(partition.keySet).isEmpty)
             partition.values.foreach { path =>
+              // Paths should actually exist
               assert(Files.exists(Paths.get(path.substring("file:".length))))
             }
           }
diff --git a/mjolnir/test/conftest.py b/mjolnir/test/conftest.py
index bf2ad8c..0ab729c 100644
--- a/mjolnir/test/conftest.py
+++ b/mjolnir/test/conftest.py
@@ -24,6 +24,27 @@
     return os.path.join(cur_dir, 'fixtures')
 
 
[email protected]
+def folds_a(fixtures_dir):
+    fixtures_dir = os.path.join(fixtures_dir, 'datasets')
+    return [
+        [{"train": os.path.join(fixtures_dir, "train.xgb"), "test": 
os.path.join(fixtures_dir, "test.xgb")}]
+    ]
+
+
[email protected]
+def folds_b(fixtures_dir):
+    fixtures_dir = os.path.join(fixtures_dir, 'datasets')
+
+    def f(path):
+        return os.path.join(fixtures_dir, path + ".xgb")
+
+    return [
+        [{"train": f("train.f0.p0")}, {"train": f("train.f0.p1")}],
+        [{"train": f("train.f1.p0")}, {"train": f("train.f1.p1")}]
+    ]
+
+
 @pytest.fixture(scope="session")
 def spark_context(request):
     """Fixture for creating a spark context.
@@ -49,8 +70,8 @@
         .setAppName("pytest-pyspark-local-testing")
         # Maven coordinates of jvm dependencies
         .set('spark.jars.packages', ','.join([
-            'ml.dmlc:xgboost4j-spark:0.7-wmf-1',
-            'org.wikimedia.search:mjolnir:0.2',
+            'ml.dmlc:xgboost4j-spark:0.8-wmf-1',
+            'org.wikimedia.search:mjolnir:0.3',
             'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0']))
         # By default spark will shuffle to 200 partitions, which is
         # way too many for our small test cases. This cuts execution
diff --git a/mjolnir/test/fixtures/datasets/test.xgb 
b/mjolnir/test/fixtures/datasets/test.xgb
new file mode 100644
index 0000000..d3d6afa
--- /dev/null
+++ b/mjolnir/test/fixtures/datasets/test.xgb
@@ -0,0 +1,100 @@
+3 1:0.326636 2:0.077529 3:0.371918 4:0.380291 5:0.014513
+0 1:0.972146 2:0.158156 3:0.914378 4:0.862571 5:0.320560
+1 1:0.298956 2:0.364456 3:0.883224 4:0.396381 5:0.793094
+3 1:0.661035 2:0.051392 3:0.516250 4:0.451712 5:0.944109
+0 1:0.864322 2:0.525315 3:0.786576 4:0.989264 5:0.083457
+3 1:0.404920 2:0.319258 3:0.998247 4:0.337145 5:0.418243
+0 1:0.510521 2:0.187725 3:0.330769 4:0.927407 5:0.335392
+3 1:0.814285 2:0.251424 3:0.506982 4:0.762243 5:0.100247
+3 1:0.117167 2:0.819767 3:0.709812 4:0.796283 5:0.540541
+3 1:0.897631 2:0.580800 3:0.026243 4:0.964434 5:0.231140
+2 1:0.391528 2:0.683021 3:0.997876 4:0.568064 5:0.010284
+2 1:0.454713 2:0.288310 3:0.848797 4:0.748149 5:0.522314
+1 1:0.168367 2:0.309259 3:0.664556 4:0.824906 5:0.370062
+0 1:0.650610 2:0.220118 3:0.321737 4:0.612518 5:0.071387
+1 1:0.411739 2:0.803898 3:0.042743 4:0.842187 5:0.391775
+2 1:0.807920 2:0.979387 3:0.307194 4:0.988092 5:0.529102
+2 1:0.755759 2:0.365076 3:0.885247 4:0.565210 5:0.615610
+0 1:0.254644 2:0.437205 3:0.257438 4:0.279958 5:0.175046
+3 1:0.444070 2:0.378370 3:0.610439 4:0.936986 5:0.287545
+0 1:0.967863 2:0.092285 3:0.987719 4:0.792169 5:0.690630
+2 1:0.455959 2:0.806951 3:0.713977 4:0.546367 5:0.950988
+0 1:0.790699 2:0.858631 3:0.611960 4:0.836762 5:0.386819
+0 1:0.240780 2:0.842705 3:0.766013 4:0.317449 5:0.359710
+1 1:0.783596 2:0.017780 3:0.765471 4:0.225174 5:0.000262
+1 1:0.144028 2:0.529792 3:0.203593 4:0.102651 5:0.774630
+1 1:0.886765 2:0.524375 3:0.555517 4:0.347809 5:0.076116
+3 1:0.094031 2:0.956022 3:0.274482 4:0.054051 5:0.856636
+1 1:0.921116 2:0.567716 3:0.406270 4:0.066023 5:0.419928
+1 1:0.768755 2:0.520684 3:0.138103 4:0.003975 5:0.975453
+2 1:0.936436 2:0.675702 3:0.286579 4:0.053526 5:0.297008
+1 1:0.304488 2:0.246547 3:0.547152 4:0.094481 5:0.680314
+1 1:0.058460 2:0.155417 3:0.608897 4:0.400933 5:0.805939
+0 1:0.537291 2:0.740986 3:0.590492 4:0.608429 5:0.549569
+0 1:0.981997 2:0.140494 3:0.061062 4:0.045810 5:0.401585
+2 1:0.369939 2:0.285637 3:0.853111 4:0.225219 5:0.426097
+3 1:0.908313 2:0.569428 3:0.497288 4:0.491016 5:0.006803
+2 1:0.179035 2:0.642798 3:0.625024 4:0.927981 5:0.405290
+1 1:0.239090 2:0.710597 3:0.179604 4:0.371392 5:0.449195
+1 1:0.579927 2:0.728269 3:0.206559 4:0.424931 5:0.713960
+3 1:0.649605 2:0.731400 3:0.631224 4:0.175438 5:0.304366
+2 1:0.669838 2:0.153425 3:0.771762 4:0.553746 5:0.040279
+0 1:0.436897 2:0.473752 3:0.795377 4:0.820149 5:0.190161
+0 1:0.640871 2:0.516977 3:0.592973 4:0.490025 5:0.577195
+2 1:0.997780 2:0.095394 3:0.651330 4:0.749575 5:0.660816
+3 1:0.152661 2:0.704911 3:0.947403 4:0.870216 5:0.798095
+1 1:0.747997 2:0.075223 3:0.939461 4:0.643620 5:0.914241
+2 1:0.133448 2:0.430715 3:0.789519 4:0.610028 5:0.903299
+0 1:0.401159 2:0.373494 3:0.659331 4:0.294644 5:0.629553
+0 1:0.207960 2:0.361213 3:0.911492 4:0.554985 5:0.493094
+1 1:0.057594 2:0.713257 3:0.446922 4:0.420421 5:0.567548
+1 1:0.887591 2:0.980055 3:0.615915 4:0.974575 5:0.590911
+3 1:0.727584 2:0.967858 3:0.491858 4:0.012659 5:0.283389
+3 1:0.021647 2:0.394032 3:0.764637 4:0.590971 5:0.827838
+3 1:0.620332 2:0.833978 3:0.671682 4:0.236934 5:0.858537
+1 1:0.256581 2:0.496616 3:0.825439 4:0.632813 5:0.164283
+3 1:0.442709 2:0.256347 3:0.231125 4:0.436597 5:0.587120
+2 1:0.464535 2:0.337670 3:0.452877 4:0.892539 5:0.001143
+3 1:0.734347 2:0.180984 3:0.214319 4:0.363408 5:0.644541
+2 1:0.787623 2:0.952571 3:0.030398 4:0.049772 5:0.242738
+3 1:0.953805 2:0.961399 3:0.837528 4:0.898172 5:0.862682
+1 1:0.731495 2:0.847731 3:0.557340 4:0.431582 5:0.900491
+2 1:0.726246 2:0.828104 3:0.761575 4:0.919778 5:0.119979
+2 1:0.934568 2:0.153811 3:0.337450 4:0.940157 5:0.040626
+0 1:0.483256 2:0.741818 3:0.447331 4:0.580388 5:0.378437
+2 1:0.350344 2:0.330684 3:0.712829 4:0.787849 5:0.380805
+3 1:0.962389 2:0.991490 3:0.913410 4:0.158044 5:0.180574
+1 1:0.464680 2:0.328260 3:0.684814 4:0.092428 5:0.477978
+3 1:0.717346 2:0.441867 3:0.444311 4:0.014475 5:0.951867
+1 1:0.829431 2:0.888164 3:0.221868 4:0.171388 5:0.296757
+1 1:0.952573 2:0.183402 3:0.666165 4:0.527787 5:0.412143
+2 1:0.229767 2:0.820025 3:0.615662 4:0.820428 5:0.603999
+3 1:0.982424 2:0.041073 3:0.796715 4:0.015130 5:0.543146
+2 1:0.929637 2:0.825426 3:0.727255 4:0.677794 5:0.093995
+2 1:0.253085 2:0.925220 3:0.464014 4:0.905004 5:0.860812
+3 1:0.551381 2:0.561334 3:0.478454 4:0.881930 5:0.117634
+3 1:0.584691 2:0.779480 3:0.264777 4:0.778328 5:0.787311
+2 1:0.047838 2:0.133641 3:0.556090 4:0.565904 5:0.211877
+2 1:0.557917 2:0.261338 3:0.125646 4:0.258342 5:0.632127
+2 1:0.396906 2:0.526099 3:0.922920 4:0.899431 5:0.437990
+2 1:0.133891 2:0.128417 3:0.841312 4:0.671067 5:0.483595
+3 1:0.710280 2:0.183570 3:0.528577 4:0.940885 5:0.543355
+3 1:0.884920 2:0.375900 3:0.851078 4:0.021749 5:0.808034
+0 1:0.998045 2:0.539014 3:0.476142 4:0.714477 5:0.674686
+1 1:0.051806 2:0.881044 3:0.620053 4:0.324929 5:0.658388
+3 1:0.274004 2:0.305911 3:0.293288 4:0.200069 5:0.898098
+0 1:0.856834 2:0.648173 3:0.159354 4:0.792070 5:0.728352
+2 1:0.123342 2:0.957180 3:0.621960 4:0.480828 5:0.102538
+0 1:0.768011 2:0.718725 3:0.152013 4:0.877257 5:0.184134
+3 1:0.043845 2:0.106492 3:0.343883 4:0.476853 5:0.931994
+0 1:0.840979 2:0.724733 3:0.132602 4:0.660006 5:0.172654
+1 1:0.790630 2:0.387497 3:0.146874 4:0.272060 5:0.230415
+0 1:0.483095 2:0.769878 3:0.038515 4:0.773712 5:0.406844
+2 1:0.611834 2:0.054520 3:0.097503 4:0.852499 5:0.350596
+1 1:0.319831 2:0.741203 3:0.445348 4:0.305239 5:0.076114
+1 1:0.064030 2:0.230787 3:0.716318 4:0.777473 5:0.473709
+0 1:0.030735 2:0.086434 3:0.230371 4:0.166334 5:0.729940
+0 1:0.443625 2:0.148763 3:0.777603 4:0.469975 5:0.918963
+1 1:0.011012 2:0.474785 3:0.837294 4:0.976318 5:0.813301
+3 1:0.143489 2:0.537931 3:0.422035 4:0.032870 5:0.770511
+3 1:0.531663 2:0.379978 3:0.993619 4:0.126227 5:0.036455
diff --git a/mjolnir/test/fixtures/datasets/test.xgb.query 
b/mjolnir/test/fixtures/datasets/test.xgb.query
new file mode 100644
index 0000000..ea19cda
--- /dev/null
+++ b/mjolnir/test/fixtures/datasets/test.xgb.query
@@ -0,0 +1,7 @@
+16
+12
+12
+20
+12
+16
+12
diff --git a/mjolnir/test/fixtures/datasets/train.xgb 
b/mjolnir/test/fixtures/datasets/train.xgb
new file mode 100644
index 0000000..c079891
--- /dev/null
+++ b/mjolnir/test/fixtures/datasets/train.xgb
@@ -0,0 +1,100 @@
+2 1:0.276857 2:0.309246 3:0.443840 4:0.323302 5:0.251910
+0 1:0.696140 2:0.637534 3:0.229497 4:0.280137 5:0.617490
+2 1:0.413772 2:0.395534 3:0.429219 4:0.603654 5:0.517427
+0 1:0.661208 2:0.568080 3:0.704676 4:0.597076 5:0.923187
+0 1:0.448723 2:0.282579 3:0.554448 4:0.515220 5:0.520206
+0 1:0.690696 2:0.517868 3:0.721864 4:0.683105 5:0.645097
+0 1:0.028015 2:0.758854 3:0.510872 4:0.041371 5:0.938664
+2 1:0.628253 2:0.164138 3:0.174485 4:0.660370 5:0.661200
+3 1:0.156874 2:0.244076 3:0.834069 4:0.816389 5:0.267347
+0 1:0.598002 2:0.961438 3:0.115683 4:0.176997 5:0.802739
+0 1:0.210183 2:0.094221 3:0.315377 4:0.104599 5:0.078312
+2 1:0.155581 2:0.239757 3:0.390496 4:0.448591 5:0.363550
+0 1:0.842447 2:0.449796 3:0.699602 4:0.775102 5:0.551474
+1 1:0.282622 2:0.493173 3:0.308711 4:0.588665 5:0.015217
+1 1:0.849260 2:0.716784 3:0.801057 4:0.702118 5:0.467375
+1 1:0.318216 2:0.502648 3:0.139761 4:0.320343 5:0.677026
+1 1:0.496001 2:0.137648 3:0.962708 4:0.638394 5:0.922043
+0 1:0.113989 2:0.482813 3:0.614473 4:0.398461 5:0.411334
+3 1:0.910096 2:0.742803 3:0.802974 4:0.242235 5:0.741078
+3 1:0.026262 2:0.153817 3:0.866324 4:0.152764 5:0.079376
+2 1:0.866166 2:0.176590 3:0.501546 4:0.550038 5:0.252295
+2 1:0.635001 2:0.421436 3:0.400276 4:0.960437 5:0.958324
+0 1:0.511912 2:0.465321 3:0.391040 4:0.536889 5:0.262131
+3 1:0.061608 2:0.844460 3:0.775436 4:0.940628 5:0.472095
+2 1:0.968002 2:0.142278 3:0.746172 4:0.167117 5:0.888660
+1 1:0.264066 2:0.675206 3:0.761167 4:0.605750 5:0.458382
+3 1:0.848002 2:0.323477 3:0.985868 4:0.032662 5:0.831382
+2 1:0.173251 2:0.098697 3:0.002584 4:0.172282 5:0.467190
+3 1:0.464708 2:0.120618 3:0.178631 4:0.107419 5:0.458566
+3 1:0.085904 2:0.100485 3:0.439993 4:0.253806 5:0.225162
+3 1:0.309424 2:0.219766 3:0.979150 4:0.992466 5:0.781002
+2 1:0.978544 2:0.831527 3:0.924340 4:0.472260 5:0.854699
+1 1:0.838158 2:0.539927 3:0.033527 4:0.939022 5:0.858939
+0 1:0.478958 2:0.035299 3:0.321513 4:0.138026 5:0.344530
+1 1:0.420183 2:0.050261 3:0.168622 4:0.987962 5:0.081127
+3 1:0.529119 2:0.756857 3:0.525388 4:0.906647 5:0.015031
+2 1:0.757490 2:0.697273 3:0.672960 4:0.038027 5:0.567640
+1 1:0.877515 2:0.997738 3:0.140903 4:0.022510 5:0.275767
+1 1:0.707713 2:0.767267 3:0.146578 4:0.177046 5:0.390119
+2 1:0.126545 2:0.338924 3:0.290230 4:0.140717 5:0.193812
+1 1:0.438441 2:0.413887 3:0.715906 4:0.718078 5:0.037032
+1 1:0.394099 2:0.898491 3:0.851562 4:0.854452 5:0.200452
+3 1:0.554398 2:0.927987 3:0.506810 4:0.361222 5:0.339552
+1 1:0.651350 2:0.642931 3:0.992078 4:0.394123 5:0.356481
+2 1:0.323378 2:0.726388 3:0.278780 4:0.873152 5:0.760571
+2 1:0.134356 2:0.635057 3:0.085017 4:0.454603 5:0.756119
+2 1:0.980191 2:0.788963 3:0.817764 4:0.653687 5:0.556327
+0 1:0.008553 2:0.637250 3:0.401720 4:0.094888 5:0.483444
+3 1:0.422236 2:0.459310 3:0.469951 4:0.514213 5:0.806955
+2 1:0.794112 2:0.424444 3:0.341827 4:0.791480 5:0.355004
+1 1:0.650646 2:0.831209 3:0.360482 4:0.730525 5:0.253130
+0 1:0.899328 2:0.843065 3:0.668982 4:0.386870 5:0.767945
+3 1:0.459478 2:0.172103 3:0.115004 4:0.861834 5:0.497636
+3 1:0.502739 2:0.717756 3:0.324993 4:0.854486 5:0.691303
+3 1:0.694347 2:0.975218 3:0.749809 4:0.080061 5:0.216270
+0 1:0.204749 2:0.152912 3:0.364078 4:0.440324 5:0.064159
+0 1:0.731656 2:0.371814 3:0.359377 4:0.176542 5:0.820773
+0 1:0.247573 2:0.992306 3:0.796591 4:0.586521 5:0.681643
+0 1:0.057699 2:0.006356 3:0.445668 4:0.924739 5:0.316846
+2 1:0.542624 2:0.136532 3:0.997968 4:0.995610 5:0.035732
+1 1:0.394307 2:0.992210 3:0.872835 4:0.150469 5:0.909376
+2 1:0.857816 2:0.033190 3:0.880868 4:0.565811 5:0.665156
+1 1:0.942718 2:0.660032 3:0.178236 4:0.182812 5:0.087367
+2 1:0.831711 2:0.563728 3:0.384249 4:0.217499 5:0.294353
+0 1:0.876942 2:0.456118 3:0.673524 4:0.541833 5:0.584563
+3 1:0.576608 2:0.133179 3:0.125881 4:0.989455 5:0.918416
+1 1:0.750385 2:0.262965 3:0.308398 4:0.490595 5:0.058171
+2 1:0.650555 2:0.260526 3:0.103875 4:0.015988 5:0.965965
+0 1:0.256881 2:0.816037 3:0.178294 4:0.502192 5:0.982004
+2 1:0.197509 2:0.074589 3:0.638905 4:0.277399 5:0.396911
+2 1:0.354089 2:0.236236 3:0.093984 4:0.792231 5:0.763705
+1 1:0.616673 2:0.861175 3:0.073603 4:0.534483 5:0.935103
+3 1:0.251242 2:0.234976 3:0.922202 4:0.126030 5:0.462835
+1 1:0.120315 2:0.277710 3:0.911577 4:0.730788 5:0.657184
+2 1:0.106240 2:0.919003 3:0.650803 4:0.052300 5:0.717360
+2 1:0.047706 2:0.206002 3:0.439120 4:0.411402 5:0.153675
+2 1:0.573659 2:0.866770 3:0.863335 4:0.203366 5:0.234077
+1 1:0.035194 2:0.398743 3:0.510657 4:0.028964 5:0.619880
+3 1:0.729706 2:0.133357 3:0.864867 4:0.806695 5:0.681677
+3 1:0.745570 2:0.384475 3:0.758124 4:0.269669 5:0.134004
+2 1:0.625335 2:0.962249 3:0.365400 4:0.905473 5:0.574668
+0 1:0.127824 2:0.574420 3:0.925172 4:0.861584 5:0.651584
+0 1:0.283864 2:0.524622 3:0.822224 4:0.572662 5:0.054927
+0 1:0.634046 2:0.067980 3:0.154102 4:0.392269 5:0.630872
+0 1:0.715898 2:0.914851 3:0.767055 4:0.929095 5:0.545982
+1 1:0.265327 2:0.815794 3:0.787845 4:0.316523 5:0.931774
+3 1:0.989390 2:0.000113 3:0.033949 4:0.230697 5:0.298948
+1 1:0.894869 2:0.368693 3:0.138898 4:0.941772 5:0.320418
+0 1:0.230322 2:0.525224 3:0.071963 4:0.762348 5:0.378543
+1 1:0.615923 2:0.670154 3:0.398374 4:0.978306 5:0.445780
+2 1:0.225093 2:0.960227 3:0.620164 4:0.878473 5:0.018904
+0 1:0.527845 2:0.985475 3:0.661959 4:0.950368 5:0.760041
+3 1:0.153984 2:0.333098 3:0.228054 4:0.466053 5:0.466788
+1 1:0.721555 2:0.533151 3:0.648141 4:0.394710 5:0.959431
+2 1:0.838903 2:0.029601 3:0.941272 4:0.642579 5:0.999925
+3 1:0.704906 2:0.658367 3:0.698945 4:0.545432 5:0.448200
+1 1:0.244696 2:0.192652 3:0.219868 4:0.399418 5:0.217660
+1 1:0.834653 2:0.236075 3:0.051846 4:0.215874 5:0.286917
+0 1:0.391280 2:0.895263 3:0.587608 4:0.880774 5:0.779338
+2 1:0.215979 2:0.890866 3:0.158514 4:0.359504 5:0.023049
diff --git a/mjolnir/test/fixtures/datasets/train.xgb.query 
b/mjolnir/test/fixtures/datasets/train.xgb.query
new file mode 100644
index 0000000..52d733e
--- /dev/null
+++ b/mjolnir/test/fixtures/datasets/train.xgb.query
@@ -0,0 +1,7 @@
+18
+14
+14
+12
+16
+16
+10
diff --git a/mjolnir/test/fixtures/load_config/example_train.expect 
b/mjolnir/test/fixtures/load_config/example_train.expect
index 52bea1b..3199df6 100644
--- a/mjolnir/test/fixtures/load_config/example_train.expect
+++ b/mjolnir/test/fixtures/load_config/example_train.expect
@@ -31,12 +31,51 @@
         executor-memory: 2G
         files: /usr/lib/libhdfs.so.0.0.0
         master: yarn
-        packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+        packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
         repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
       spark_command: /usr/lib/spark2/bin/spark-submit
       spark_conf:
         spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet 
-Dhttp.proxyPort=8080
           -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
+        spark.task.cpus: '1'
+        spark.yarn.executor.memoryOverhead: '512'
+    make_folds:
+      cmd_args:
+        input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
+        max-executors: '50'
+        num-folds: '5'
+        num-workers: '1'
+        output-dir: 
hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_global
+      environment:
+        HOME: /home/pytest
+        PYSPARK_PYTHON: venv/bin/python
+        SPARK_CONF_DIR: /etc/spark2/conf
+        SPARK_HOME: /usr/lib/spark2
+        USER: pytest
+      mjolnir_utility: make_folds
+      mjolnir_utility_path: /srv/mjolnir/venv/bin/mjolnir-utilities.py
+      paths:
+        dir_exist: !!set
+          /etc/spark2/conf: null
+        file_exist: !!set
+          /srv/mjolnir/mjolnir_venv.zip: null
+          /srv/mjolnir/venv/bin/mjolnir-utilities.py: null
+          /usr/lib/spark2/bin/spark-submit: null
+          venv/bin/python: null
+      spark_args:
+        archives: /srv/mjolnir/mjolnir_venv.zip#venv
+        executor-cores: '1'
+        executor-memory: 2G
+        files: /usr/lib/libhdfs.so.0.0.0
+        master: yarn
+        packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+        repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
+      spark_command: /usr/lib/spark2/bin/spark-submit
+      spark_conf:
+        spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet 
-Dhttp.proxyPort=8080
+          -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
+        spark.dynamicAllocation.maxExecutors: '100'
+        spark.locality.wait: '0'
         spark.task.cpus: '1'
         spark.yarn.executor.memoryOverhead: '512'
     pyspark:
@@ -60,7 +99,7 @@
         executor-memory: 2G
         files: /usr/lib/libhdfs.so.0.0.0
         master: yarn
-        packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+        packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
         repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
       spark_command: /usr/lib/spark2/bin/pyspark
       spark_conf:
@@ -89,7 +128,7 @@
         executor-memory: 2G
         files: /usr/lib/libhdfs.so.0.0.0
         master: yarn
-        packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+        packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
         repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
       spark_command: /usr/lib/spark2/bin/pyspark
       spark_conf:
@@ -99,8 +138,8 @@
         spark.yarn.executor.memoryOverhead: '6144'
     training_pipeline:
       cmd_args:
-        input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
-        output: /home/pytest/training_size/marker_global
+        input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_global
+        output: /home/pytest/training_size/marker-folds_global
       environment:
         HOME: /home/pytest
         PYSPARK_PYTHON: venv/bin/python
@@ -125,14 +164,13 @@
         executor-memory: 2G
         files: /usr/lib/libhdfs.so.0.0.0
         master: yarn
-        packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+        packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
         repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
       spark_command: /usr/lib/spark2/bin/spark-submit
       spark_conf:
         spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet 
-Dhttp.proxyPort=8080
           -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
         spark.dynamicAllocation.executorIdleTimeout: 180s
-        spark.sql.autoBroadcastJoinThreshold: '-1'
         spark.task.cpus: '1'
         spark.yarn.executor.memoryOverhead: '512'
   wikis: []
@@ -170,7 +208,7 @@
           executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/spark-submit
         spark_conf:
@@ -178,6 +216,45 @@
             -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
           spark.task.cpus: '1'
           spark.yarn.executor.memoryOverhead: '512'
+      make_folds:
+        cmd_args:
+          input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
+          max-executors: '50'
+          num-folds: '3'
+          num-workers: '1'
+          output-dir: 
hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_large
+        environment:
+          HOME: /home/pytest
+          PYSPARK_PYTHON: venv/bin/python
+          SPARK_CONF_DIR: /etc/spark2/conf
+          SPARK_HOME: /usr/lib/spark2
+          USER: pytest
+        mjolnir_utility: make_folds
+        mjolnir_utility_path: /srv/mjolnir/venv/bin/mjolnir-utilities.py
+        paths:
+          dir_exist: !!set
+            /etc/spark2/conf: null
+          file_exist: !!set
+            /srv/mjolnir/mjolnir_venv.zip: null
+            /srv/mjolnir/venv/bin/mjolnir-utilities.py: null
+            /usr/lib/spark2/bin/spark-submit: null
+            venv/bin/python: null
+        spark_args:
+          archives: /srv/mjolnir/mjolnir_venv.zip#venv
+          executor-cores: '1'
+          executor-memory: 2G
+          files: /usr/lib/libhdfs.so.0.0.0
+          master: yarn
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
+        spark_command: /usr/lib/spark2/bin/spark-submit
+        spark_conf:
+          spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet 
-Dhttp.proxyPort=8080
+            -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
+          spark.dynamicAllocation.maxExecutors: '100'
+          spark.locality.wait: '0'
+          spark.task.cpus: '1'
+          spark.yarn.executor.memoryOverhead: '5120'
       pyspark:
         environment:
           HOME: /home/pytest
@@ -199,7 +276,7 @@
           executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/pyspark
         spark_conf:
@@ -228,7 +305,7 @@
           executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/pyspark
         spark_conf:
@@ -238,12 +315,10 @@
           spark.yarn.executor.memoryOverhead: '6144'
       training_pipeline:
         cmd_args:
-          cv-jobs: '22'
+          cv-jobs: '80'
           final-trees: '100'
-          folds: '3'
-          input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
-          output: /home/pytest/training_size/marker_large
-          workers: '3'
+          input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_large
+          output: /home/pytest/training_size/marker-folds_large
         environment:
           HOME: /home/pytest
           PYSPARK_PYTHON: venv/bin/python
@@ -265,20 +340,19 @@
           archives: /srv/mjolnir/mjolnir_venv.zip#venv
           driver-memory: 3G
           executor-cores: '6'
-          executor-memory: 4G
+          executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/spark-submit
         spark_conf:
           spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet 
-Dhttp.proxyPort=8080
             -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
           spark.dynamicAllocation.executorIdleTimeout: 180s
-          spark.dynamicAllocation.maxExecutors: '65'
-          spark.sql.autoBroadcastJoinThreshold: '-1'
+          spark.dynamicAllocation.maxExecutors: '85'
           spark.task.cpus: '6'
-          spark.yarn.executor.memoryOverhead: '9216'
+          spark.yarn.executor.memoryOverhead: '6144'
     wikis:
     - enwiki
     - dewiki
@@ -315,7 +389,7 @@
           executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/spark-submit
         spark_conf:
@@ -323,6 +397,45 @@
             -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
           spark.task.cpus: '1'
           spark.yarn.executor.memoryOverhead: '512'
+      make_folds:
+        cmd_args:
+          input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
+          max-executors: '50'
+          num-folds: '5'
+          num-workers: '1'
+          output-dir: 
hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_medium
+        environment:
+          HOME: /home/pytest
+          PYSPARK_PYTHON: venv/bin/python
+          SPARK_CONF_DIR: /etc/spark2/conf
+          SPARK_HOME: /usr/lib/spark2
+          USER: pytest
+        mjolnir_utility: make_folds
+        mjolnir_utility_path: /srv/mjolnir/venv/bin/mjolnir-utilities.py
+        paths:
+          dir_exist: !!set
+            /etc/spark2/conf: null
+          file_exist: !!set
+            /srv/mjolnir/mjolnir_venv.zip: null
+            /srv/mjolnir/venv/bin/mjolnir-utilities.py: null
+            /usr/lib/spark2/bin/spark-submit: null
+            venv/bin/python: null
+        spark_args:
+          archives: /srv/mjolnir/mjolnir_venv.zip#venv
+          executor-cores: '1'
+          executor-memory: 2G
+          files: /usr/lib/libhdfs.so.0.0.0
+          master: yarn
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
+        spark_command: /usr/lib/spark2/bin/spark-submit
+        spark_conf:
+          spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet 
-Dhttp.proxyPort=8080
+            -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
+          spark.dynamicAllocation.maxExecutors: '100'
+          spark.locality.wait: '0'
+          spark.task.cpus: '1'
+          spark.yarn.executor.memoryOverhead: '1536'
       pyspark:
         environment:
           HOME: /home/pytest
@@ -344,7 +457,7 @@
           executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/pyspark
         spark_conf:
@@ -373,7 +486,7 @@
           executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/pyspark
         spark_conf:
@@ -383,12 +496,10 @@
           spark.yarn.executor.memoryOverhead: '6144'
       training_pipeline:
         cmd_args:
-          cv-jobs: '70'
+          cv-jobs: '125'
           final-trees: '100'
-          folds: '5'
-          input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
-          output: /home/pytest/training_size/marker_medium
-          workers: '1'
+          input: 
hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_medium
+          output: /home/pytest/training_size/marker-folds_medium
         environment:
           HOME: /home/pytest
           PYSPARK_PYTHON: venv/bin/python
@@ -409,21 +520,20 @@
         spark_args:
           archives: /srv/mjolnir/mjolnir_venv.zip#venv
           driver-memory: 3G
-          executor-cores: '6'
-          executor-memory: 3G
+          executor-cores: '4'
+          executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/spark-submit
         spark_conf:
           spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet 
-Dhttp.proxyPort=8080
             -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
           spark.dynamicAllocation.executorIdleTimeout: 180s
-          spark.dynamicAllocation.maxExecutors: '75'
-          spark.sql.autoBroadcastJoinThreshold: '-1'
-          spark.task.cpus: '6'
-          spark.yarn.executor.memoryOverhead: '9216'
+          spark.dynamicAllocation.maxExecutors: '130'
+          spark.task.cpus: '4'
+          spark.yarn.executor.memoryOverhead: '2048'
     wikis:
     - itwiki
     - ptwiki
@@ -462,7 +572,7 @@
           executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/spark-submit
         spark_conf:
@@ -470,6 +580,45 @@
             -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
           spark.task.cpus: '1'
           spark.yarn.executor.memoryOverhead: '512'
+      make_folds:
+        cmd_args:
+          input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
+          max-executors: '50'
+          num-folds: '5'
+          num-workers: '1'
+          output-dir: 
hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_small
+        environment:
+          HOME: /home/pytest
+          PYSPARK_PYTHON: venv/bin/python
+          SPARK_CONF_DIR: /etc/spark2/conf
+          SPARK_HOME: /usr/lib/spark2
+          USER: pytest
+        mjolnir_utility: make_folds
+        mjolnir_utility_path: /srv/mjolnir/venv/bin/mjolnir-utilities.py
+        paths:
+          dir_exist: !!set
+            /etc/spark2/conf: null
+          file_exist: !!set
+            /srv/mjolnir/mjolnir_venv.zip: null
+            /srv/mjolnir/venv/bin/mjolnir-utilities.py: null
+            /usr/lib/spark2/bin/spark-submit: null
+            venv/bin/python: null
+        spark_args:
+          archives: /srv/mjolnir/mjolnir_venv.zip#venv
+          executor-cores: '1'
+          executor-memory: 2G
+          files: /usr/lib/libhdfs.so.0.0.0
+          master: yarn
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
+        spark_command: /usr/lib/spark2/bin/spark-submit
+        spark_conf:
+          spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet 
-Dhttp.proxyPort=8080
+            -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
+          spark.dynamicAllocation.maxExecutors: '100'
+          spark.locality.wait: '0'
+          spark.task.cpus: '1'
+          spark.yarn.executor.memoryOverhead: '1024'
       pyspark:
         environment:
           HOME: /home/pytest
@@ -491,7 +640,7 @@
           executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/pyspark
         spark_conf:
@@ -520,7 +669,7 @@
           executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/pyspark
         spark_conf:
@@ -530,12 +679,10 @@
           spark.yarn.executor.memoryOverhead: '6144'
       training_pipeline:
         cmd_args:
-          cv-jobs: '100'
+          cv-jobs: '125'
           final-trees: '500'
-          folds: '5'
-          input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
-          output: /home/pytest/training_size/marker_small
-          workers: '1'
+          input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_small
+          output: /home/pytest/training_size/marker-folds_small
         environment:
           HOME: /home/pytest
           PYSPARK_PYTHON: venv/bin/python
@@ -560,17 +707,16 @@
           executor-memory: 2G
           files: /usr/lib/libhdfs.so.0.0.0
           master: yarn
-          packages: 
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+          packages: 
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
           repositories: 
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
         spark_command: /usr/lib/spark2/bin/spark-submit
         spark_conf:
           spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet 
-Dhttp.proxyPort=8080
             -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
           spark.dynamicAllocation.executorIdleTimeout: 180s
-          spark.dynamicAllocation.maxExecutors: '105'
-          spark.sql.autoBroadcastJoinThreshold: '-1'
+          spark.dynamicAllocation.maxExecutors: '130'
           spark.task.cpus: '4'
-          spark.yarn.executor.memoryOverhead: '6144'
+          spark.yarn.executor.memoryOverhead: '1024'
     wikis:
     - svwiki
     - fawiki
@@ -582,7 +728,6 @@
     - fiwiki
     - jawiki
     - arwiki
-    - itwiki
     - nlwiki
     - zhwiki
     - plwiki
diff --git a/mjolnir/test/test_utils.py b/mjolnir/test/test_utils.py
new file mode 100644
index 0000000..69fecab
--- /dev/null
+++ b/mjolnir/test/test_utils.py
@@ -0,0 +1,69 @@
+from contextlib import contextmanager
+from copy import deepcopy
+import mjolnir.utils
+import pytest
+
+
+class FailTestException(Exception):
+    pass
+
+
+def make_multi_manager(pre_raise, post_raise, transitions):
+    pre_raise = list(pre_raise)
+    post_raise = list(post_raise)
+
+    @contextmanager
+    def f(data):
+        if pre_raise.pop(0):
+            transitions.append(('pre-raise', data))
+            raise FailTestException('pre-raise: ' + str(data))
+        transitions.append(('begin', data))
+        yield data
+        transitions.append(('end', data))
+        if post_raise.pop(0):
+            transitions.append(('post raise', data))
+            raise FailTestException('post raise: ' + str(data))
+
+    return mjolnir.utils.multi_with(f)
+
+
+def generate_fixtures():
+    tests = []
+    n = 3
+
+    success = [False] * n
+    expect_success = [
+        success,
+        success,
+        [('begin', i) for i in range(n)] +
+        [('inside', None)] +
+        [('end', i) for i in range(n)]]
+
+    tests.append(expect_success)
+
+    expect_failure = deepcopy(expect_success)
+    expect_failure[0][1] = True
+    expect_failure[2] = [('begin', 0), ('pre-raise', 1), ('end', 0)]
+    tests.append(expect_failure)
+
+    expect_failure = deepcopy(expect_success)
+    expect_failure[0][n - 1] = True
+    expect_failure[2] = [('begin', i) for i in range(n - 1)] + \
+        [('pre-raise', n - 1)] + \
+        [('end', i) for i in range(n-1)]
+    tests.append(expect_failure)
+
+    return ('pre_raise,post_raise,expected', tests)
+
+
[email protected](*generate_fixtures())
+def test_multi_with_cleanup(pre_raise, post_raise, expected):
+    n = 3
+    transitions = []
+    g = make_multi_manager(pre_raise, post_raise, transitions)
+    try:
+        with g(list(range(n))):
+            transitions.append(('inside', None))
+    except FailTestException:
+        pass
+    assert expected == transitions
diff --git a/mjolnir/test/training/test_hyperopt.py 
b/mjolnir/test/training/test_hyperopt.py
index 6ef6f8a..c4c4782 100644
--- a/mjolnir/test/training/test_hyperopt.py
+++ b/mjolnir/test/training/test_hyperopt.py
@@ -20,7 +20,7 @@
     ).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
 
 
-def test_minimize(df_train):
+def test_minimize(folds_b):
     "Not an amazing test...basically sees if the happy path doesnt blow up"
     space = {
         'num_rounds': 50,
@@ -31,8 +31,7 @@
     # xgboost is separately tested. Instead of going all the way into xgboost
     # mock it out w/MockModel.
     best_params, trails = mjolnir.training.hyperopt.minimize(
-        df_train, MockModel, space, max_evals=5, num_folds=2,
-        num_workers=1)
+        folds_b, MockModel, space, max_evals=5)
     assert isinstance(best_params, dict)
     # num_rounds should have been unchanged
     assert 'num_rounds' in best_params
@@ -41,13 +40,23 @@
     assert len(trails.trials) == 5
 
 
+class MockSummary(object):
+    def train(self):
+        return [1.]
+
+    def test(self):
+        return [1.]
+
+
 class MockModel(object):
-    def __init__(self, df, params, num_workers):
+    def __init__(self, df, params, train_matrix=None):
         # Params that were passed to hyperopt
         assert isinstance(params, dict)
         assert 'max_depth' in params
         assert params['num_rounds'] == 50
-        assert num_workers == 1
 
     def eval(self, df_test, j_groups=None, feature_col='features', 
label_col='label'):
         return 1.0
+
+    def summary(self):
+        return MockSummary()
diff --git a/mjolnir/test/training/test_tuning.py 
b/mjolnir/test/training/test_tuning.py
index eaa35b6..e14982b 100644
--- a/mjolnir/test/training/test_tuning.py
+++ b/mjolnir/test/training/test_tuning.py
@@ -47,14 +47,12 @@
     ).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
 
 
-def test_cross_validate_plain_df(df_train):
+def test_cross_validate_plain_df(folds_a):
     scores = mjolnir.training.tuning.cross_validate(
-        df_train,
+        folds_a,
         mjolnir.training.xgboost.train,
         {'objective': 'rank:ndcg', 'eval_metric': 'ndcg@3', 'num_rounds': 1},
-        # xgboost needs all jobs to have a worker assigned before it will
-        # finish a round of training, so we have to be careful not to use
-        # too many workers
-        num_folds=2, num_workers=1, pool=None)
+        pool=None)
     # one score for each fold
-    assert len(scores) == 2
+    for fold, score in zip(folds_a, scores):
+        assert fold[0].keys() == score.keys()
diff --git a/mjolnir/test/training/test_xgboost.py 
b/mjolnir/test/training/test_xgboost.py
index c4642d0..e49e496 100644
--- a/mjolnir/test/training/test_xgboost.py
+++ b/mjolnir/test/training/test_xgboost.py
@@ -57,6 +57,30 @@
 
 
 @pytest.fixture()
+def train_path(fixtures_dir):
+    """
+    Fake data generated with:
+
+    data = [(random.randint(0,3), [random.random() for i in range(5)]) for i 
in range(100)]
+    with open('train.xgb', 'w') as f:
+      for label, features in data:
+        f.write("%d %s\n" % (label, " ".join(["%d:%f" % (i+1, feat) for i, 
feat in enumerate(features)])))
+    with open('train.xgb.query', 'w') as f:
+       i = 0
+       while i < len(data):
+           next = min(len(data) - i, random.randint(10,20))
+           f.write("%d\n" % (next))
+           i += next
+    """
+    return fixtures_dir + '/datasets/train.xgb'
+
+
[email protected]()
+def test_path(fixtures_dir):
+    return fixtures_dir + '/datasets/test.xgb'
+
+
[email protected]()
 def df_train(spark_context, hive_context):
     rdd1 = spark_context.parallelize([
         ('foowiki', 'foo', 2, Vectors.dense([2.2])),
@@ -73,11 +97,11 @@
         .toDF(['wikiid', 'query', 'label', 'features']))
 
 
-def test_train_minimum_params(df_train):
+def test_train_minimum_params(df_train, folds_a):
     params = {'num_rounds': 1}
     # TODO: Anything > 1 worker can easily get stuck, as a system
-    # with 2 cpus will only spawn a single worker.
-    model = mjolnir.training.xgboost.train(df_train, params, num_workers=1)
+    # with 2 cpus will only spawn a single executor.
+    model = mjolnir.training.xgboost.train(folds_a[0], params, 'train')
 
     # What else can we practically assert?
     df_transformed = model.transform(df_train)
@@ -86,25 +110,3 @@
 
     # make sure train didn't clobber the incoming params
     assert params['num_rounds'] == 1
-
-
-def _always_raise(*args, **kwargs):
-    raise ValueError('should not be called')
-
-
-def test_train_pre_prepped(df_train):
-    num_workers = 1
-    params = {'num_rounds': 1}
-
-    df_grouped, j_groups = mjolnir.training.xgboost.prep_training(
-        df_train, num_workers)
-    params['groupData'] = j_groups
-
-    # TODO: This is probably not how we should make sure it isn't called..
-    orig_prep_training = mjolnir.training.xgboost.prep_training
-    try:
-        mjolnir.training.xgboost.prep_training = _always_raise
-        model = mjolnir.training.xgboost.train(df_grouped, params)
-        assert 0.74 == pytest.approx(model.eval(df_grouped, j_groups), 
abs=0.01)
-    finally:
-        mjolnir.training.xgboost.prep_training = orig_prep_training
diff --git a/mjolnir/training/hyperopt.py b/mjolnir/training/hyperopt.py
index b4001eb..9976bb3 100644
--- a/mjolnir/training/hyperopt.py
+++ b/mjolnir/training/hyperopt.py
@@ -80,13 +80,13 @@
         return state
 
 
-def minimize(df, train_func, space, max_evals=50, algo=hyperopt.tpe.suggest,
-             num_folds=5, num_workers=5, cv_pool=None, trials_pool=None):
+def minimize(folds, train_func, space, max_evals=50, algo=hyperopt.tpe.suggest,
+             cv_pool=None, trials_pool=None):
     """Perform cross validated hyperparameter optimization of train_func
 
     Parameters
     ----------
-    df : pyspark.sql.DataFrame
+    folds : list of dict each with two keys, train and test
         Features and Labels to optimize over
     train_func : callable
         Function to use for training individual models
@@ -97,10 +97,6 @@
     algo : callable
         The algorithm to use with hyperopt. See docs of hyperopt.fmin for more
         details.
-    num_folds : int
-        Number of folds to split df into for cross validation
-    num_workers : int
-        Number of executors to use for each model training
     cv_pool : multiprocessing.dummy.Pool or None
         Controls the number of models to run in parallel. If None models
         are trained sequentially.
@@ -117,13 +113,13 @@
     """
 
     def objective(params):
-        scores = mjolnir.training.tuning._cross_validate(
-            folds, train_func, params, num_workers=num_workers,
-            pool=cv_pool)
+        scores = mjolnir.training.tuning.cross_validate(
+            folds, train_func, params, pool=cv_pool)
         # For now the only metric is NDCG, and hyperopt is a minimizer
-        # so return the negative NDCG
-        loss = [-s['test'] for s in scores]
-        true_loss = [s['train'] - s['test'] for s in scores]
+        # so return the negative NDCG. Also makes the bold assumption
+        # we had at least two pieces of the fold named 'test' and 'train'
+        loss = [-s['test'][-1] for s in scores]
+        true_loss = [s['train'][-1] - s['test'][-1] for s in scores]
         num_failures = sum([math.isnan(s) for s in loss])
         if num_failures > 1:
             return {
@@ -141,21 +137,13 @@
             'true_loss_variance': np.var(true_loss),
         }
 
-    folds = mjolnir.training.tuning._make_folds(
-        df, num_folds=num_folds, num_workers=num_workers, pool=cv_pool)
+    if trials_pool is None:
+        trials = hyperopt.Trials()
+    else:
+        trials = ThreadingTrials(trials_pool)
 
-    try:
-        if trials_pool is None:
-            trials = hyperopt.Trials()
-        else:
-            trials = ThreadingTrials(trials_pool)
-        best = hyperopt.fmin(objective, space, algo=algo,
-                             max_evals=max_evals, trials=trials)
-    finally:
-        for fold in folds:
-            fold['train'].unpersist()
-            fold['test'].unpersist()
-
+    best = hyperopt.fmin(objective, space, algo=algo,
+                         max_evals=max_evals, trials=trials)
     # hyperopt only returns the non-constant parameters in best. It seems
     # more convenient to return all of them.
     best_merged = space.copy()
diff --git a/mjolnir/training/tuning.py b/mjolnir/training/tuning.py
index d76fd1b..f106f88 100644
--- a/mjolnir/training/tuning.py
+++ b/mjolnir/training/tuning.py
@@ -93,6 +93,7 @@
     ----------
     df : pyspark.sql.DataFrame
     num_folds : int
+    output_column : str, optional
 
     Yields
     ------
@@ -103,71 +104,6 @@
         .withColumn(output_column, mjolnir.spark.add_meta(df._sc, 
F.col(output_column), {
             'num_folds': num_folds,
         })))
-
-
-def _make_folds(df, num_folds, num_workers, pool):
-    """Transform a DataFrame with assigned folds into many dataframes.
-
-    The results of split and group_k_fold emit a single dataframe with folds
-    marked on the individual rows. To do the resulting training we need 
individual
-    dataframes for each test/train split within the folds. If the data has
-    not already had folds assigned they will be assigned based on the 
'num_folds'
-    key in params. If not present 5 folds will be used.
-
-    Also generates the appropriate group data for xgboost. This doesn't
-    necessarily belong here, but it is relatively expensive to calculate, so we
-    benefit significantly by doing it once before hyperparameter tuning, as
-    opposed to doing it for each iteration.
-
-    Parameters
-    ----------
-    df : pyspark.sql.DataFrame
-        Input dataframe with a 'fold' column indicating which fold each row
-        belongs to.
-    num_folds : int
-        Number of folds to create. If a 'fold' column already exists in df
-        this will be ignored.
-    num_workers : int
-        Number of workers used to train each model. This is passed onto
-        xgboost.prep_training to prepare each fold.
-    pool : multiprocessing.dummy.Pool or None
-        Used to prepare folds in parallel. If not provided folds will be
-        generated sequentially.
-
-    Returns
-    -------
-    list
-        Generates a list of dicts, one for each fold. Each dict contains
-        train and test keys containing DataFrames, along with j_train_groups
-        and j_test_groups keys which contain py4j JavaObject instances
-        corresponding to group data needed by xgboost for train/eval. The train
-        and test dataframes have been persisted, so should be unpersisted
-        when no longer needed.
-    """
-    if 'fold' in df.columns:
-        assert num_folds == df.schema['fold'].metadata['num_folds']
-        df_folds = df
-    else:
-        df_folds = group_k_fold(df, num_folds)
-
-    def job(fold):
-        condition = F.col('fold') == fold
-        # TODO: de-couple xgboost from cv generation.
-        df_train, j_train_groups = mjolnir.training.xgboost.prep_training(
-                df_folds.where(~condition), num_workers)
-        df_test, j_test_groups = mjolnir.training.xgboost.prep_training(
-                df_folds.where(condition), num_workers)
-        return {
-            'train': df_train,
-            'test': df_test,
-            'j_train_groups': j_train_groups,
-            'j_test_groups': j_test_groups
-        }
-
-    if pool is None:
-        return map(job, range(num_folds))
-    else:
-        return pool.map(job, range(num_folds))
 
 
 def _py4j_retry(fn, default_retval):
@@ -190,15 +126,14 @@
     return with_retry
 
 
-def _cross_validate(folds, train_func, params, num_workers, pool):
+def cross_validate(folds, train_func, params, pool):
     """Perform cross validation of the provided folds
 
     Parameters
     ----------
-    folds : list
+    folds : list of dict containing train and test keys
     train_func : callable
     params : dict
-    num_workers : int
     pool : multiprocessing.dummy.Pool or None
 
     Returns
@@ -206,55 +141,19 @@
     list
     """
     def job(fold):
-        local_params = params.copy()
-        local_params['groupData'] = fold['j_train_groups']
-        model = train_func(fold['train'], local_params, 
num_workers=num_workers)
+        model = train_func(fold, params)
+        # TODO: Summary is hardcodeed to train/test
         return {
-            'train': model.eval(fold['train'], fold['j_train_groups']),
-            'test': model.eval(fold['test'], fold['j_test_groups']),
+            "train": model.summary().train(),
+            "test": model.summary().test(),
         }
 
     job_w_retry = _py4j_retry(job, {
-        'train': float('nan'),
-        'test': float('nan'),
+        "train": [float('nan')],
+        "test": [float('nan')],
     })
 
     if pool is None:
         return map(job_w_retry, folds)
     else:
         return pool.map(job_w_retry, folds)
-
-
-def cross_validate(df, train_func, params, num_folds=5, num_workers=5, 
pool=None):
-    """Perform cross-validation of the dataframe
-
-    Parameters
-    ----------
-    df : pyspark.sql.DataFrame or list
-    train_func : callable
-        Function used to train a model. Must return a model that
-        implements an eval method
-    params : dict
-        parameters to pass on to train_func
-    num_folds : int
-        Number of folds to split df into for cross validation
-    num_workers : int
-        Number of executors to use for each model training
-    pool : multiprocessing.dummy.Pool, optional
-        Used to prepare folds and run cross validations in parallel. If
-        not provided all work will be done sequentially.
-
-    Returns
-    -------
-    list
-        List of dicts, each dict containing a train and test key. The values
-        correspond the the model evaluation metric for the train and test
-        data frames.
-    """
-    folds = _make_folds(df, num_folds, num_workers, pool)
-    try:
-        return _cross_validate(folds, train_func, params, 
num_workers=num_workers, pool=pool)
-    finally:
-        for fold in folds:
-            fold['train'].unpersist()
-            fold['test'].unpersist()
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index 2e30a36..984c0e6 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -1,29 +1,22 @@
 from __future__ import absolute_import
+import functools
 import hyperopt
 import math
 import mjolnir.spark
 import mjolnir.training.hyperopt
 from multiprocessing.dummy import Pool
 import numpy as np
+import pyspark
 import pyspark.sql
 from pyspark.sql import functions as F
 import tempfile
 
-# Example Command line:
-# PYSPARK_PYTHON=venv/bin/python SPARK_CONF_DIR=/etc/spark/conf 
~/spark-2.1.0-bin-hadoop2.6/bin/pyspark \
-#     --master yarn \
-#     --jars ~/mjolnir_2.11-1.0.jar \
-#     --driver-class-path ~/mjolnir_2.11-1.0.jar \
-#     --archives 'mjolnir_venv.zip#venv' \
-#     --files /usr/lib/libhdfs.so.0.0.0 \
-#     --executor-cores 4 \
-#     --executor-memory 4G \
-#     --conf spark.dynamicAllocation.maxExecutors=40 \
-#     --conf spark.task.cpus=4
-
 
 def prep_training(df, num_partitions=None):
     """Prepare a dataframe for training
+
+    This is no longer used for training. It can stlil be used to run 
predictions
+    or evaluations on a model. Training uses make_fold utility now.
 
     Ranking models in XGBoost require rows for the same query to be provided
     consequtively within a single partition. It additionally requires a
@@ -106,70 +99,62 @@
     return retval
 
 
-def train(df, params, num_workers=None):
+def train(fold, params, train_matrix=None):
     """Train a single xgboost ranking model.
 
-    df : pyspark.sql.DataFrame
-        Training data
+    fold: dict
+        map from split names to list of data partitions
     params : dict
         parameters to pass on to xgboost
-    num_workers : int, optional
-        The number of executors to train with. If not provided then
-        'groupData' *must* be present in params and num_workers will
-        be set to the number of partitions in df.
 
     Returns
     -------
     XGBoostModel
         Trained xgboost model
     """
-
     # hyperparameter tuning may have given us floats where we need
     # ints, so this gets all the types right for Java. Also makes
     # a copy of params so we don't modifying the incoming dict.
     params = _coerce_params(params)
     # TODO: Maybe num_rounds should just be external? But it's easier
     # to do hyperparameter optimization with a consistent dict interface
-    num_rounds = params['num_rounds']
-    del params['num_rounds']
+    kwargs = {
+        'num_rounds': 100,
+        'early_stopping_round': 0,
+    }
+    if 'num_rounds' in params:
+        kwargs['num_rounds'] = params['num_rounds']
+        del params['num_rounds']
+    if 'early_stopping_round' in params:
+        kwargs['early_stopping_round'] = params['early_stopping_round']
+        del params['early_stopping_round']
+
     # Set some sane defaults for ranking tasks
     if 'objective' not in params:
         params['objective'] = 'rank:ndcg'
     if 'eval_metric' not in params:
         params['eval_metric'] = 'ndcg@10'
 
-    unpersist = False
-    if num_workers is None:
-        num_workers = df.rdd.getNumPartitions()
-        if 'groupData' in params:
-            assert params['groupData'].length() == num_workers
-            df_grouped = df
+    # Convenience for some situations, but typically be explicit
+    # about the name of the matrix to train against.
+    if train_matrix is None:
+        train_matrix = "all" if "all" in fold else "train"
+
+    return XGBoostModel.trainWithFiles(fold, train_matrix, params, **kwargs)
+
+
+class XGBoostSummary(object):
+    def __init__(self, j_xgb_summary):
+        self._j_xgb_summary = j_xgb_summary
+
+    def train(self):
+        return list(self._j_xgb_summary.trainObjectiveHistory())
+
+    def test(self):
+        if self._j_xgb_summary.testObjectiveHistory().isEmpty():
+            return None
         else:
-            df_grouped, j_groups = prep_training(df, num_workers)
-            unpersist = True
-            params['groupData'] = j_groups
-    elif 'groupData' in params:
-        df_grouped = df
-    else:
-        df_grouped, j_groups = prep_training(df, num_workers)
-        unpersist = True
-        params['groupData'] = j_groups
-
-    # We must have the same number of partitions here as workers the model will
-    # be trained with, or xgboost4j-spark will repartition and the c++ library
-    # will throw an exception. It's much cleaner to fail-fast here rather than
-    # figuring out c++ errors through JNI from remote workers.
-    assert df_grouped.rdd.getNumPartitions() == num_workers
-    assert 'groupData' in params
-    assert params['groupData'].length() == num_workers
-
-    try:
-        return XGBoostModel.trainWithDataFrame(df_grouped, params, num_rounds,
-                                               num_workers, 
feature_col='features',
-                                               label_col='label')
-    finally:
-        if unpersist:
-            df_grouped.unpersist()
+            return list(self._j_xgb_summary.testObjectiveHistory().get())
 
 
 class XGBoostModel(object):
@@ -177,9 +162,8 @@
         self._j_xgb_model = j_xgb_model
 
     @staticmethod
-    def trainWithDataFrame(trainingData, params, num_rounds, num_workers, 
objective=None,
-                           eval_metric=None, missing=float('nan'),
-                           feature_col='features', label_col='label'):
+    def trainWithFiles(fold, train_matrix, params, num_rounds=100,
+                       early_stopping_round=0):
         """Wrapper around scala XGBoostModel.trainWithRDD
 
         This intentionally forwards to trainWithRDD, rather than
@@ -188,39 +172,34 @@
 
         Parameters
         ----------
-        trainingData : pyspark.sql.DataFrame
+        fold: dict
+            map from string name to list of data files for the split
+        train_matrix: str
+            name of split in fold to train against
         params : dict
+            XGBoost training parameters
         num_rounds : int
-        num_workers : int
-        objective : py4j.java_gateway.JavaObject, optional
-            Allows providing custom objective implementation. (Default: None)
-        eval_metric : py4j.java_gateway.JavaObject, optional
-            Allows providing a custom evaluation metric implementation.
-            (Default: None)
-        missing : float, optional
-            The value representing the missing value in the dataset. features 
with
-            this value will be removed and the vectors treated as sparse. 
(Default: nan)
-        feature_col : string, optional
-            The dataframe column holding feature vectors. (Default: features)
-        label_col : string, optional
-            The dataframe column holding labels. (Default: label)
+            Maximum number of boosting rounds to perform
+        early_stopping_round : int, optional
+            Quit training after this many rounds with no improvement in
+            test set eval. 0 disables behaviour. (Default: 0)
 
         Returns
         -------
         mjolnir.training.xgboost.XGBoostModel
             trained xgboost ranking model
         """
-        sc = trainingData._sc
+        sc = pyspark.SparkContext.getOrCreate()
+        # Type is Seq[Map[String, String]]
+        j_fold = sc._jvm.PythonUtils.toSeq([sc._jvm.PythonUtils.toScalaMap(x) 
for x in fold])
+        # Type is Map[String, Any]
         j_params = sc._jvm.scala.collection.immutable.HashMap()
         for k, v in params.items():
             j_params = j_params.updated(k, v)
 
-        j_rdd = 
sc._jvm.org.wikimedia.search.mjolnir.PythonUtils.toLabeledPoints(
-            trainingData._jdf, feature_col, label_col)
-
-        j_xgb_model = 
sc._jvm.ml.dmlc.xgboost4j.scala.spark.XGBoost.trainWithRDD(
-            j_rdd, j_params, num_rounds, num_workers,
-            objective, eval_metric, False, missing)
+        j_xgb_model = 
sc._jvm.org.wikimedia.search.mjolnir.MlrXGBoost.trainWithFiles(
+            sc._jsc, j_fold, train_matrix, j_params, num_rounds,
+            early_stopping_round)
         return XGBoostModel(j_xgb_model)
 
     def transform(self, df_test):
@@ -312,6 +291,9 @@
         score = self._j_xgb_model.eval(j_rdd, 'test', None, 0, False, j_groups)
         return float(score.split('=')[1].strip())
 
+    def summary(self):
+        return XGBoostSummary(self._j_xgb_model.summary())
+
     def saveModelAsHadoopFile(self, sc, path):
         j_sc = 
sc._jvm.org.apache.spark.api.java.JavaSparkContext.toSparkContext(sc._jsc)
         self._j_xgb_model.saveModelAsHadoopFile(path, j_sc)
@@ -333,7 +315,7 @@
         return XGBoostModel(j_xgb_model)
 
 
-def tune(df, num_folds=5, num_cv_jobs=5, num_workers=5, initial_num_trees=100, 
final_num_trees=500):
+def tune(folds, stats, train_matrix, num_cv_jobs=5, initial_num_trees=100, 
final_num_trees=500):
     """Find appropriate hyperparameters for training df
 
     This is far from perfect, hyperparameter tuning is a bit of a black art
@@ -352,16 +334,11 @@
 
     Parameters
     ----------
-    df : pyspark.sql.DataFrame
-    num_folds : int, optional
-        The number of cross validation folds to use while tuning. (Default: 5)
+    folds : list of dict containing train and test keys
+    stats : dict
+        stats about the fold from the make_folds utility script
     num_cv_jobs : int, optional
         The number of cross validation folds to train in parallel. (Default: 5)
-    num_workers : int, optional
-        The number of spark executors to use per fold for training. The total
-        number of executors used will be (num_cv_jobs * num_workers). Generally
-        prefer executors with more cpu's over a higher number of workers where
-        possible. (Default: 5)
     initial_num_trees: int, optional
         The number of trees to do most of the hyperparameter tuning with. This
         should be large enough to be resonably representative of the final
@@ -379,26 +356,33 @@
         performed, each containing a hyperopt.Trials object recording what
         happened.
     """
+    cv_pool = None
+    if num_cv_jobs > 1:
+        cv_pool = Pool(num_cv_jobs)
 
-    cv_pool = Pool(num_cv_jobs)
-    trials_pool_size = int(math.floor(num_cv_jobs / num_folds))
+    # Configure the trials pool large enough to keep cv_pool full
+    num_folds = len(folds)
+    num_workers = len(folds[0])
+    trials_pool_size = int(math.floor(num_cv_jobs / (num_folds * num_workers)))
     if trials_pool_size > 1:
         print 'Running %d cross validations in parallel' % (trials_pool_size)
         trials_pool = Pool(trials_pool_size)
     else:
         trials_pool = None
 
+    train_func = functools.partial(train, train_matrix=train_matrix)
+
     def eval_space(space, max_evals):
         """Eval a space using standard hyperopt"""
         best, trials = mjolnir.training.hyperopt.minimize(
-            df, train, space, max_evals=max_evals, num_folds=num_folds,
-            num_workers=num_workers, cv_pool=cv_pool, trials_pool=trials_pool)
+            folds, train_func, space, max_evals=max_evals,
+            cv_pool=cv_pool, trials_pool=trials_pool)
         for k, v in space.items():
             if not np.isscalar(v):
                 print 'best %s: %f' % (k, best[k])
         return best, trials
 
-    num_obs = df.count()
+    num_obs = stats['num_observations']
 
     if num_obs > 8000000:
         dataset_size = 'xlarge'
diff --git a/mjolnir/utilities/make_folds.py b/mjolnir/utilities/make_folds.py
new file mode 100644
index 0000000..c7ac04d
--- /dev/null
+++ b/mjolnir/utilities/make_folds.py
@@ -0,0 +1,205 @@
+"""
+Using the outputs of data_pipeline.py split our training data
+into multiple folds with a test/train split for each. Save these
+splits, to hdfs if requested, in the appropriate training algorithm
+binary format.
+"""
+from __future__ import absolute_import
+import argparse
+import collections
+import json
+import multiprocessing.dummy
+import mjolnir.feature_engineering
+import mjolnir.training.tuning
+from mjolnir.utils import as_local_paths, as_output_file, as_output_files, \
+                          hdfs_mkdir, hdfs_unlink
+import os
+from pyspark import SparkContext
+from pyspark.sql import functions as F, HiveContext
+import xgboost
+
+
+def summarize_training_df(df, data_size):
+    if data_size > 10000000:
+        df = df.repartition(200)
+    summary = collections.defaultdict(dict)
+    for row in 
mjolnir.feature_engineering.explode_features(df).describe().collect():
+        statistic = row.summary
+        for field, value in (x for x in row.asDict().items() if x[0] != 
'summary'):
+            summary[field][statistic] = value
+    return dict(summary)
+
+
+def make_df_stats(df, data_size):
+    return {
+        'num_observations': data_size,
+        'num_queries': df.select('query').drop_duplicates().count(),
+        'num_norm_queries': 
df.select('norm_query_id').drop_duplicates().count(),
+        'features': df.schema['features'].metadata['features'],
+        'summary': summarize_training_df(df, data_size),
+    }
+
+
+def write_xgb(in_path, out_path):
+    # Because xgboost hates us, reading a group file from disk
+    # is only supported in cli. From the c_api we have to provide it.
+    with open(in_path + '.query') as query:
+        query_boundaries = [int(line) for line in query]
+    # We cannot access the jvm from executors, only the driver,
+    # which means jvm xgboost is not available. For this one limited
+    # use case we must also have the python xgboost package.
+    dmat = xgboost.DMatrix(in_path)
+    dmat.set_group(query_boundaries)
+    dmat.save_binary(out_path)
+
+
+def write_wiki_folds(sc, df, num_workers, fold_col, path_format, features):
+    def write_binaries(rows):
+        # row is dict from split name (train/test) to path data can be found
+        for pair in rows:
+            i, row = pair
+            with as_local_paths(row.values(), with_query=True) as 
local_inputs, \
+                    as_output_files([path + '.xgb' for path in row.values()]) 
as local_outputs:
+                for local_input, local_output in zip(local_inputs, 
local_outputs):
+                    write_xgb(local_input, local_output.name)
+
+    # Write out as text files from scala, much faster than shuffling to python
+    writer = sc._jvm.org.wikimedia.search.mjolnir.DataWriter(sc._jsc)
+    j_paths = writer.write(df._jdf, num_workers, path_format, fold_col)
+
+    # Convert everything to python objects
+    # in scala this is Array[Array[Map[String, String]]]
+    all_paths = []
+    for j_fold in j_paths:
+        fold = []
+        all_paths.append(fold)
+        for j_partition in j_fold:
+            partition = {str(k): str(v) for k, v in dict(j_partition).items()}
+            fold.append(partition)
+
+    # Enumerate gives a partition id used by partitionBy below
+    # This isn't a partition id of the data, but the stage we are making.
+    all_splits = list(enumerate(partition for fold in all_paths for partition 
in fold))
+    # For all the emitted folds create binary data files
+    sc.parallelize(all_splits, 1).partitionBy(len(all_splits), lambda x: 
x).foreachPartition(write_binaries)
+    # Cleanup the text/query output, keeping only the binary data files
+    hdfs_paths = []
+    for i, partition in all_splits:
+        for path in partition.values():
+            for extension in ['', '.query']:
+                hdfs_paths.append(path + extension)
+    hdfs_unlink(*hdfs_paths)
+    return all_paths
+
+
+def write_wiki_all(*args):
+    return write_wiki_folds(*args)[0]
+
+
+def make_folds(sc, sqlContext, input_dir, output_dir, wikis, zero_features, 
num_folds, num_workers, max_executors):
+    hdfs_mkdir(output_dir)
+    df = sqlContext.read.parquet(input_dir) \
+        .select('wikiid', 'query', 'features', 'label', 'norm_query_id')
+    if wikis:
+        df = df.where(F.col('wikiid').isin(wikis))
+
+    counts = 
df.groupBy('wikiid').agg(F.count(F.lit(1)).alias('n_obs')).collect()
+    counts = {row.wikiid: row.n_obs for row in counts}
+
+    if not wikis:
+        wikis = counts.keys()
+    else:
+        missing = set(wikis).difference(counts.keys())
+        for wiki in missing:
+            print 'No observations available for ' + wiki
+        wikis = list(set(wikis).intersection(counts.keys()))
+    if not wikis:
+        raise Exception('No wikis provided')
+
+    # sort to descending size, so mapping over them does the largest first
+    wikis.sort(reverse=True, key=lambda wiki: counts[wiki])
+
+    if zero_features:
+        df = mjolnir.feature_engineering.zero_features(df, zero_features)
+    if max_executors is None:
+        max_executors = num_workers
+
+    # TODO: Limit size?
+    pool = multiprocessing.dummy.Pool(len(wikis) * 3)
+    features = df.schema['features'].metadata['features']
+
+    df_fold = (
+        mjolnir.training.tuning.group_k_fold(df, num_folds)
+        .repartition(200, 'wikiid', 'query')
+        .sortWithinPartitions('wikiid', 'query'))
+
+    try:
+        df_fold.cache()
+        df_fold.count()
+
+        wiki_stats = {}
+        for wiki in wikis:
+            df_wiki = df_fold.where(F.col('wikiid') == wiki).drop('wikiid')
+            path_format = os.path.join(output_dir, wiki + '.%s.f%s.p%d')
+            wiki_stats[wiki] = {
+                'all': pool.apply_async(write_wiki_all, (sc, df_wiki, 
num_workers, None, path_format, features)),
+                'folds': pool.apply_async(write_wiki_folds, (sc, df_wiki, 
num_workers, 'fold', path_format, features)),
+                'stats': pool.apply_async(make_df_stats, (df_wiki, 
counts[wiki])),
+            }
+
+        wiki_stats = {wiki: {k: v.get() for k, v in stats.items()} for wiki, 
stats in wiki_stats.items()}
+        for wiki in wikis:
+            wiki_stats[wiki]['num_folds'] = num_folds
+            wiki_stats[wiki]['num_workers'] = num_workers
+    finally:
+        df_fold.unpersist()
+
+    with as_output_file(os.path.join(output_dir, 'stats.json')) as f:
+        f.write(json.dumps({
+            'input_dir': input_dir,
+            'wikis': wiki_stats,
+        }))
+
+
+def parse_arguments(argv):
+    parser = argparse.ArgumentParser(description='Prepare XGB binary matrices')
+    parser.add_argument(
+        '-i', '--input', dest='input_dir', type=str, required=True,
+        help='Input path, prefixed with hdfs://, to dataframe with labels and 
features')
+    parser.add_argument(
+        '-o', '--output-dir', dest='output_dir', type=str, required=True,
+        help='Output path, prefixed with hdfs://, to store binary matrices')
+    parser.add_argument(
+        '-f', '--num-folds', dest='num_folds', type=int, default=5,
+        help='The number of folds to split the data into')
+    parser.add_argument(
+        '-x', '--max-executors', dest='max_executors', type=int, default=None,
+        help='The maximum number of executors to use to write out folds')
+    parser.add_argument(
+        '-w', '--num-workers', dest='num_workers', type=int, default=1,
+        help='The number of workers used to train a single model')
+    parser.add_argument(
+        '-z', '--zero-features', dest='zero_features', type=str, 
required=False, default=None,
+        help='TODO')
+    parser.add_argument(
+        'wikis', metavar='wiki', type=str, nargs='*',
+        help='List of wikis to build matrices for')
+    args = parser.parse_args(argv)
+    return dict(vars(args))
+
+
+def main(argv=None):
+    args = parse_arguments(argv)
+
+    app_name = 'MLR: writer binary folded datasets'
+    if args['wikis']:
+        app_name += ': ' + ', '.join(args['wikis'])
+    sc = SparkContext(appName=app_name)
+    sc.setLogLevel('WARN')
+    sqlContext = HiveContext(sc)
+
+    make_folds(sc, sqlContext, **args)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/mjolnir/utilities/spark.py b/mjolnir/utilities/spark.py
index b911295..d49555d 100644
--- a/mjolnir/utilities/spark.py
+++ b/mjolnir/utilities/spark.py
@@ -226,6 +226,16 @@
             'commands': {
                 'pyspark': ['spark_command'],
                 'data_pipeline': ['spark_command', 'mjolnir_utility_path', 
'mjolnir_utility'],
+                'make_folds': {
+                    'spark_command': [],
+                    'mjolnir_utility_path': [],
+                    'mjolnir_utility': [],
+                    'spark_conf': [
+                        'spark.yarn.executor.memoryOverhead',
+                    ],
+                    'spark_args': [],
+                    'cmd_args': ['num-workers', 'num-folds']
+                },
                 'training_pipeline': {
                     # Using an empty array requires the key exists, even if it 
doesn't
                     # contain sub-properties.
@@ -238,7 +248,7 @@
                         'spark.task.cpus'
                     ],
                     'spark_args': ['executor-memory', 'executor-cores'],
-                    'cmd_args': ['workers', 'cv-jobs', 'folds', 'final-trees']
+                    'cmd_args': ['cv-jobs', 'final-trees']
                 }
             }
         })
@@ -406,6 +416,13 @@
     subprocess_check_call(cmd, env=config['environment'])
 
 
+def make_folds(global_profile, profiles):
+    for name, profile in profiles.items():
+        config = profile['commands']['make_folds']
+        cmd = build_spark_command(config) + build_mjolnir_utility(config) + 
profile['wikis']
+        subprocess_check_call(cmd, env=config['environment'])
+
+
 def train(global_profile, profiles):
     """Run mjolnir training pipeline"""
     for name, profile in profiles.items():
@@ -497,9 +514,13 @@
             'func': train,
             'needed': ['training_pipeline'],
         },
+        'make_folds': {
+            'func': make_folds,
+            'needed': ['make_folds'],
+        },
         'collect_and_train': {
             'func': collect_and_train,
-            'needed': ['data_pipeline', 'training_pipeline'],
+            'needed': ['data_pipeline', 'make_folds', 'training_pipeline'],
         },
         'shell': {
             'func': lambda x, y: shell('pyspark', x, y),
diff --git a/mjolnir/utilities/training_pipeline.py 
b/mjolnir/utilities/training_pipeline.py
index 6918768..99a72c8 100644
--- a/mjolnir/utilities/training_pipeline.py
+++ b/mjolnir/utilities/training_pipeline.py
@@ -8,69 +8,65 @@
         --artifacts 'mjolnir_venv.zip#venv' \
         path/to/training_pipeline.py
 """
-
 from __future__ import absolute_import
 import argparse
-import collections
 import datetime
 import glob
+import json
 import logging
 import mjolnir.feature_engineering
 import mjolnir.training.xgboost
+from mjolnir.utils import hdfs_open_read
 import os
 import pickle
-import sys
 from pyspark import SparkContext
 from pyspark.sql import HiveContext
-from pyspark.sql import functions as F
+import sys
 
 
-def summarize_training_df(df, data_size):
-    """Generate a summary of min/max/stddev/mean of data frame
+def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, 
initial_num_trees, final_num_trees, num_cv_jobs):
+    with hdfs_open_read(os.path.join(input_dir, 'stats.json')) as f:
+        stats = json.loads(f.read())
 
-    Parameters
-    ----------
-    df : pyspark.sql.DataFrame
-        DataFrame used for training
-    data_size : int
-        Number of rows in df
+    wikis_available = set(stats['wikis'].keys())
+    if wikis:
+        missing = set(wikis).difference(wikis_available)
+        if missing:
+            raise Exception("Wikis not available: " + ", ".join(missing))
+        wikis = wikis_available.intersection(wikis)
+    else:
+        wikis = stats['wikis'].keys()
+    if not wikis:
+        raise Exception("No wikis provided")
 
-    Returns
-    -------
-    dict
-        Map from field name to a map of statistics about the field
-    """
-    if data_size > 10000000:
-        df = df.repartition(200)
-    summary = collections.defaultdict(dict)
-    for row in 
mjolnir.feature_engineering.explode_features(df).describe().collect():
-        statistic = row.summary
-        for field, value in (x for x in row.asDict().items() if x[0] != 
'summary'):
-            summary[field][statistic] = value
-    return dict(summary)
-
-
-def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, 
initial_num_trees, final_num_trees,
-                 num_workers, num_cv_jobs, num_folds, zero_features):
     for wiki in wikis:
+        config = stats['wikis'][wiki]
+
         print 'Training wiki: %s' % (wiki)
-        df_hits_with_features = (
-            sqlContext.read.parquet(input_dir)
-            .where(F.col('wikiid') == wiki))
+        num_folds = config['num_folds']
+        if num_cv_jobs is None:
+            num_cv_jobs = num_folds
 
-        data_size = df_hits_with_features.count()
-        if data_size == 0:
-            print 'No data found.' % (wiki)
-            print ''
-            continue
+        # Add extension matching training type
+        extension = ".xgb"
 
-        if zero_features:
-            df_hits_with_features = mjolnir.feature_engineering.zero_features(
-                    df_hits_with_features, zero_features)
+        # Add file extensions to all the folds
+        folds = config['folds']
+        for fold in folds:
+            for partition in fold:
+                for name, path in partition.items():
+                    partition[name] = path + extension
+
+        # "all" data with no splits
+        all_paths = config['all']
+        for partition in all_paths:
+            for name, path in partition.items():
+                partition[name] = path + extension
 
         tune_results = mjolnir.training.xgboost.tune(
-            df_hits_with_features, num_folds=num_folds,
-            num_cv_jobs=num_cv_jobs, num_workers=num_workers,
+            folds, config['stats'],
+            num_cv_jobs=num_cv_jobs,
+            train_matrix="train",
             initial_num_trees=initial_num_trees,
             final_num_trees=final_num_trees)
 
@@ -81,29 +77,21 @@
             'wiki': wiki,
             'input_dir': input_dir,
             'training_datetime': datetime.datetime.now().isoformat(),
-            'num_observations': data_size,
-            'num_queries': 
df_hits_with_features.select('query').drop_duplicates().count(),
-            'num_norm_queries': 
df_hits_with_features.select('norm_query_id').drop_duplicates().count(),
-            'features': 
df_hits_with_features.schema['features'].metadata['features'],
-            'summary': summarize_training_df(df_hits_with_features, data_size)
+            'dataset': config['stats'],
         }
 
-        # Train a model over all data with best params. Use a copy
-        # so j_groups doesn't end up inside tune_results and prevent
-        # pickle from serializing it.
+        # Train a model over all data with best params.
         best_params = tune_results['params'].copy()
         print 'Best parameters:'
         for param, value in best_params.items():
             print '\t%20s: %s' % (param, value)
-        df_grouped, j_groups = mjolnir.training.xgboost.prep_training(
-            df_hits_with_features, num_workers)
-        best_params['groupData'] = j_groups
         model = mjolnir.training.xgboost.train(
-                df_grouped, best_params)
+            all_paths, best_params, train_matrix="all")
 
-        tune_results['metrics']['train'] = model.eval(df_grouped, j_groups)
-        df_grouped.unpersist()
-        print 'train-ndcg@10: %.5f' % (tune_results['metrics']['train'])
+        tune_results['metrics'] = {
+            'train': model.summary().train()
+        }
+        print 'train-ndcg@10: %.5f' % (tune_results['metrics']['train'][-1])
 
         # Save the tune results somewhere for later analysis. Use pickle
         # to maintain the hyperopt.Trials objects as is. It might be nice
@@ -120,26 +108,17 @@
 
         # Generate a feature map so xgboost can include feature names in the 
dump.
         # The final `q` indicates all features are quantitative values 
(floats).
-        features = 
df_hits_with_features.schema['features'].metadata['features']
+        features = config['stats']['features']
         json_model_output = os.path.join(output_dir, 'model_%s.json' % (wiki))
         with open(json_model_output, 'wb') as f:
             f.write(model.dump(features))
             print 'Wrote xgboost json model to %s' % (json_model_output)
         # Write out the xgboost binary format as well, so it can be re-loaded
         # and evaluated
-        xgb_model_output = os.path.join(output_dir, 'model_%s.xgb' % (wiki))
-        model.saveModelAsLocalFile(xgb_model_output)
-        print 'Wrote xgboost binary model to %s' % (xgb_model_output)
+        model_output = os.path.join(output_dir, 'model_%s.xgb' % (wiki))
+        model.saveModelAsLocalFile(model_output)
+        print 'Wrote xgboost binary model to %s' % (model_output)
         print ''
-
-
-def str_to_bool(value):
-    if value.lower() in ['true', 'yes', '1']:
-        return True
-    elif value.lower() in ['false', 'no', '0']:
-        return False
-    else:
-        raise ValueError("Unknown boolean string: " + value)
 
 
 def parse_arguments(argv):
@@ -152,17 +131,10 @@
         help='Path, on local filesystem, to directory to store the results of '
              'model training to.')
     parser.add_argument(
-        '-w', '--workers', dest='num_workers', default=10, type=int,
-        help='Number of workers to train each individual model with. The total 
number '
-             + 'of executors required is workers * cv-jobs. (Default: 10)')
-    parser.add_argument(
         '-c', '--cv-jobs', dest='num_cv_jobs', default=None, type=int,
         help='Number of cross validation folds to perform in parallel. 
Defaults to number '
              + 'of folds, to run all in parallel. If this is a multiple of the 
number '
              + 'of folds multiple cross validations will run in parallel.')
-    parser.add_argument(
-        '-f', '--folds', dest='num_folds', default=5, type=int,
-        help='Number of cross validation folds to use. (Default: 5)')
     parser.add_argument(
         '--initial-trees', dest='initial_num_trees', default=100, type=int,
         help='Number of trees to perform hyperparamter tuning with.  (Default: 
100)')
@@ -171,21 +143,16 @@
         help='Number of trees in the final ensemble. If not provided the value 
from '
              + '--initial-trees will be used.  (Default: None)')
     parser.add_argument(
-        '-z', '--zero-feature', dest='zero_features', type=str, nargs='+',
-        help='Zero out feature in input')
-    parser.add_argument(
         '-v', '--verbose', dest='verbose', default=False, action='store_true',
         help='Increase logging to INFO')
     parser.add_argument(
         '-vv', '--very-verbose', dest='very_verbose', default=False, 
action='store_true',
         help='Increase logging to DEBUG')
     parser.add_argument(
-        'wikis', metavar='wiki', type=str, nargs='+',
+        'wikis', metavar='wiki', type=str, nargs='*',
         help='A wiki to perform model training for.')
 
     args = parser.parse_args(argv)
-    if args.num_cv_jobs is None:
-        args.num_cv_jobs = args.num_folds
     return dict(vars(args))
 
 
@@ -202,7 +169,8 @@
     # TODO: Set spark configuration? Some can't actually be set here though, 
so best might be to set all of it
     # on the command line for consistency.
     app_name = "MLR: training pipeline xgboost"
-    app_name += ': ' + ', '.join(args['wikis'])
+    if args['wikis']:
+        app_name += ': ' + ', '.join(args['wikis'])
     sc = SparkContext(appName=app_name)
     sc.setLogLevel('WARN')
     sqlContext = HiveContext(sc)
diff --git a/mjolnir/utils.py b/mjolnir/utils.py
new file mode 100644
index 0000000..7ac6422
--- /dev/null
+++ b/mjolnir/utils.py
@@ -0,0 +1,112 @@
+from __future__ import absolute_import
+import contextlib
+import os
+import subprocess
+import tempfile
+import urlparse
+
+
+def multi_with(f):
+    @contextlib.contextmanager
+    def manager(inputs, **kwargs):
+        def make_child(data):
+            with f(data, **kwargs) as inner:
+                yield inner
+
+        # children list keeps the children alive until the end of the function.
+        # python will exit the with statements above when cleaning up this 
list.
+        try:
+            children = []
+            output = []
+            for data in inputs:
+                child = make_child(data)
+                children.append(child)
+                output.append(child.send(None))
+            yield output
+        finally:
+            errors = []
+            for child in children:
+                try:
+                    child.send(None)
+                except StopIteration:
+                    pass
+                except Exception, e:
+                    errors.append(e)
+                else:
+                    errors.append(Exception("Expected StopIteration"))
+            if errors:
+                raise errors[0]
+
+    return manager
+
+
[email protected]
+def as_output_file(path):
+    if path[:7] == 'hdfs://':
+        f = tempfile.NamedTemporaryFile()
+        yield f
+        f.flush()
+        subprocess.check_call(['hdfs', 'dfs', '-copyFromLocal', f.name, path])
+    else:
+        if path[:len("file:/")] == "file:/":
+            path = path[len("file:"):]
+        with open(path, 'w') as f:
+            yield f
+
+
[email protected]
+def as_local_path(path, with_query=False):
+    if path[0] == '/':
+        yield path
+    elif path[:len("file:/")] == "file:/":
+        yield path[len("file:"):]
+    else:
+        # TODO: Untested
+        with tempfile.NamedTemporaryFile() as local:
+            subprocess.check_call(['hdfs', 'dfs', '-copyToLocal', path, 
local.name])
+            if with_query:
+                try:
+                    subprocess.check_call(['hdfs', 'dfs', '-copyToLocal', path 
+ ".query", local.name + ".query"])
+                    yield local.name
+                finally:
+                    try:
+                        os.unlink(local.name + ".query")
+                    except OSError:
+                        pass
+            else:
+                yield local.name
+
+
+as_local_paths = multi_with(as_local_path)
+as_output_files = multi_with(as_output_file)
+
+
+def hdfs_mkdir(path):
+    # Will error if it already exists
+    # TODO: Normalize error type?
+    if path[:7] == 'hdfs://':
+        subprocess.check_call(['hdfs', 'dfs', '-mkdir', path])
+    else:
+        os.mkdir(path)
+
+
+def hdfs_unlink(*paths):
+    remote = []
+    for path in paths:
+        if path[:7] == 'hdfs://':
+            remote.append(path)
+        else:
+            if path[:len("file:/")] == "file:/":
+                path = path[len("file:"):]
+            os.unlink(path)
+    if remote:
+        subprocess.check_call(['hdfs', 'dfs', '-rm'] + remote)
+
+
[email protected]
+def hdfs_open_read(path):
+    if path[:7] == 'hdfs://':
+        parts = urlparse.urlparse(path)
+        path = os.path.join('/mnt/hdfs', parts.path[1:])
+    with open(path, 'r') as f:
+        yield f
diff --git a/setup.py b/setup.py
index d2f22f1..9bd27e4 100644
--- a/setup.py
+++ b/setup.py
@@ -9,6 +9,9 @@
     'kafka',
     'pyyaml',
     'hyperopt',
+    # python xgboost is only used for building
+    # binary datasets. Primary usage is from jvm.
+    'xgboost',
     # hyperopt requires networkx < 2.0, but doesn't say so
     'networkx<2.0',
     # pyspark requirements

-- 
To view, visit https://gerrit.wikimedia.org/r/403334
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ic8a1d4f405bcd1ad07dd53ef392f47d8dfa89246
Gerrit-PatchSet: 10
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>
Gerrit-Reviewer: EBernhardson <[email protected]>
Gerrit-Reviewer: Hashar <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to