jenkins-bot has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/403334 )
Change subject: Convert to pre-built folds in binary files
......................................................................
Convert to pre-built folds in binary files
I've been suspicious of our excessive memory usage and think
shuffling the data through spark and it's various caching
solutions may be causing part of our memory usage.
Try a different way around by adding a new utility script, make_folds,
which splits the data up into CV folds and writes out all the
partitions in xgboost binary format. At training time these files
are copied from HDFS to a temporary file on the local machine so
xgboost can read it. Future updates could perhaps address tracking
the local datasets available, rather than re-transfering the dataset
on each model.
Change-Id: Ic8a1d4f405bcd1ad07dd53ef392f47d8dfa89246
---
M .gitignore
M example_train.yaml
M jvm/src/main/scala/org/wikimedia/search/mjolnir/DataWriter.scala
M jvm/src/main/scala/org/wikimedia/search/mjolnir/MlrXGBoost.scala
M jvm/src/test/scala/org/wikimedia/search/mjolnir/DBNSuite.scala
M jvm/src/test/scala/org/wikimedia/search/mjolnir/DataWriterSuite.scala
M mjolnir/test/conftest.py
A mjolnir/test/fixtures/datasets/test.xgb
A mjolnir/test/fixtures/datasets/test.xgb.query
A mjolnir/test/fixtures/datasets/train.xgb
A mjolnir/test/fixtures/datasets/train.xgb.query
M mjolnir/test/fixtures/load_config/example_train.expect
A mjolnir/test/test_utils.py
M mjolnir/test/training/test_hyperopt.py
M mjolnir/test/training/test_tuning.py
M mjolnir/test/training/test_xgboost.py
M mjolnir/training/hyperopt.py
M mjolnir/training/tuning.py
M mjolnir/training/xgboost.py
A mjolnir/utilities/make_folds.py
M mjolnir/utilities/spark.py
M mjolnir/utilities/training_pipeline.py
A mjolnir/utils.py
M setup.py
24 files changed, 1,112 insertions(+), 442 deletions(-)
Approvals:
jenkins-bot: Verified
DCausse: Looks good to me, approved
diff --git a/.gitignore b/.gitignore
index 83930f6..ca06e47 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@
*.py[cod]
*~
/jvm/target
+/jvm/mjolnir.iml
# Distribution / packaging
venv/
diff --git a/example_train.yaml b/example_train.yaml
index 7b1749c..36abc70 100644
--- a/example_train.yaml
+++ b/example_train.yaml
@@ -20,6 +20,10 @@
hdfs_training_data_path:
"hdfs://analytics-hadoop/%(training_data_path)s"
# Fully qualified local path to the training data
local_training_data_path: "/mnt/hdfs/%(training_data_path)s"
+ # Fully qualified HDFS path to pre-folded data
+ fold_data_path:
"user/%(USER)s/mjolnir/%(marker)s-folds_%(profile_name)s"
+ hdfs_fold_data_path: "hdfs://analytics-hadoop/%(fold_data_path)s"
+ local_fold_data_path: "/mnt/hdfs/%(training_data_path)s"
# Base directory used to build path to write training output to
base_training_output_dir: "%(HOME)s/training_size"
# Number of cpu cores to assign per task. Must be a multiple of
@@ -60,7 +64,7 @@
executor-memory: "%(executor_memory)s"
# Source our jvm dependencies from archiva.
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:%(spark_version)s
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:%(spark_version)s
spark_conf:
spark.task.cpus: "%(cores_per_task)s"
spark.yarn.executor.memoryOverhead: "%(executor_memory_overhead)s"
@@ -76,7 +80,6 @@
template_vars:
cores_per_executor: 4
cores_per_task: 4
- executor_memory: 2G
executor_memory_overhead: 6144
data_pipeline:
spark_command: "%(SPARK_HOME)s/bin/spark-submit"
@@ -93,6 +96,19 @@
samples-per-wiki: 35000000
search-cluster: codfw
min-sessions: "%(min_sessions_per_query)s"
+ make_folds:
+ spark_command: "%(SPARK_HOME)s/bin/spark-submit"
+ mjolnir_utility_path: "%(mjolnir_utility_path)s"
+ mjolnir_utility: make_folds
+ spark_conf:
+ spark.locality.wait: 0
+ spark.dynamicAllocation.maxExecutors: 100
+ cmd_args:
+ input: "%(hdfs_training_data_path)s"
+ output-dir: "%(hdfs_fold_data_path)s"
+ num-folds: 5
+ num-workers: 1
+ max-executors: 50
training_pipeline:
spark_command: "%(SPARK_HOME)s/bin/spark-submit"
mjolnir_utility_path: "%(mjolnir_utility_path)s"
@@ -106,38 +122,42 @@
spark_args:
driver-memory: 3G
spark_conf:
- # Disabling auto broadcast join prevents memory explosion when
spark
- # mis-predicts the size of a dataframe. (where does this
happen?)
- spark.sql.autoBroadcastJoinThreshold: -1
# Adjusting up executor idle timeout from 60s to 180s is a bit
greedy,
# but prevents a whole bunch of log spam from spark killing
executors
# between CV runs
spark.dynamicAllocation.executorIdleTimeout: 180s
cmd_args:
- input: "%(hdfs_training_data_path)s"
- output:
"%(base_training_output_dir)s/%(marker)s_%(profile_name)s"
+ input: "%(hdfs_fold_data_path)s"
+ output:
"%(base_training_output_dir)s/%(marker)s-folds_%(profile_name)s"
# Individual training groups
profiles:
large:
- # 12M to 30M observations. 4M to 12M per executor.
- # Approximately 63 executors, 378 cores, 838GB memory
+ # 12M to 30M observations.
+ # Approximately 80 executors, 480 cores, 480GB memory
+ # 8-11 min per enwiki model @ 6 cores = 48 to 66 cpu minutes
+ # 2h40min to final model training
wikis:
- enwiki
- dewiki
commands:
+ make_folds:
+ template_vars:
+ # enough space to load datasets into c++
+ executor_memory_overhead: 5120
+ cmd_args:
+ num-folds: 3
training_pipeline:
template_vars:
cores_per_executor: 6
cores_per_task: 6
- executor_memory: 4G
- executor_memory_overhead: 9216
+ # 4096 fails with matrix of 350-400M floats
+ # cv-test-dcg@10: 0.8619
+ executor_memory_overhead: 6144
spark_conf:
- spark.dynamicAllocation.maxExecutors: 65
+ spark.dynamicAllocation.maxExecutors: 85
cmd_args:
- workers: 3
- cv-jobs: 22
- folds: 3
+ cv-jobs: 80
final-trees: 100
medium:
@@ -149,18 +169,19 @@
- frwiki
- ruwiki
commands:
+ make_folds:
+ template_vars:
+ # enough space to load datasets into c++
+ executor_memory_overhead: 1536
training_pipeline:
template_vars:
- cores_per_executor: 6
- cores_per_task: 6
- executor_memory: 3G
- executor_memory_overhead: 9216
+ cores_per_executor: 4
+ cores_per_task: 4
+ executor_memory_overhead: 2048
spark_conf:
- spark.dynamicAllocation.maxExecutors: 75
+ spark.dynamicAllocation.maxExecutors: 130
cmd_args:
- workers: 1
- cv-jobs: 70
- folds: 5
+ cv-jobs: 125
final-trees: 100
small:
@@ -177,21 +198,21 @@
- fiwiki
- jawiki
- arwiki
- - itwiki
- nlwiki
- zhwiki
- plwiki
commands:
+ make_folds:
+ template_vars:
+ # enough space to load datasets into c++
+ executor_memory_overhead: 1024
training_pipeline:
template_vars:
cores_per_executor: 4
cores_per_task: 4
- executor_memory: 2G
- executor_memory_overhead: 6144
+ executor_memory_overhead: 1024
spark_conf:
- spark.dynamicAllocation.maxExecutors: 105
+ spark.dynamicAllocation.maxExecutors: 130
cmd_args:
- workers: 1
- cv-jobs: 100
- folds: 5
+ cv-jobs: 125
final-trees: 500
diff --git a/jvm/src/main/scala/org/wikimedia/search/mjolnir/DataWriter.scala
b/jvm/src/main/scala/org/wikimedia/search/mjolnir/DataWriter.scala
index dfebed2..8d6976b 100644
--- a/jvm/src/main/scala/org/wikimedia/search/mjolnir/DataWriter.scala
+++ b/jvm/src/main/scala/org/wikimedia/search/mjolnir/DataWriter.scala
@@ -138,7 +138,8 @@
/**
* @param df Output from data_pipeline. Must have be repartitioned
on query and sorted by query
- * within partitions.
+ * within partitions. This should have a large number of
partitions or later coalesce
+ * will be imbalanced.
* @param numWorkers The number of partitions each data file will be
emitted as
* @param pathFormat Format for hdfs paths. Params are %s: name, %s: fold,
%d: partition
* @param foldCol Long column to source which fold a row belongs to
@@ -146,13 +147,15 @@
* from split name to hdfs path for that partition.
*/
def write(
- df: DataFrame,
- numWorkers: Int,
- pathFormat: String,
- foldCol: Option[String]
- ): Array[Array[Map[String, String]]] = {
+ df: DataFrame,
+ numWorkers: Int,
+ pathFormat: String,
+ foldCol: Option[String]
+ ): Array[Array[Map[String, String]]] = {
val rdd = df.rdd.mapPartitions(formatPartition(df.schema, foldCol))
-
+ if (rdd.getNumPartitions < numWorkers) {
+ throw new java.io.IOException("Cannot have fewer partitions in input
than output")
+ }
val numFolds = foldCol.map { name =>
df.schema(name).metadata.getLong("num_folds").toInt }.getOrElse(1)
try {
diff --git a/jvm/src/main/scala/org/wikimedia/search/mjolnir/MlrXGBoost.scala
b/jvm/src/main/scala/org/wikimedia/search/mjolnir/MlrXGBoost.scala
index 1a2bebf..2e9f7b4 100644
--- a/jvm/src/main/scala/org/wikimedia/search/mjolnir/MlrXGBoost.scala
+++ b/jvm/src/main/scala/org/wikimedia/search/mjolnir/MlrXGBoost.scala
@@ -46,14 +46,14 @@
}
private[mjolnir] def buildDistributedBoosters(
- rdds: RDD[Map[String, String]],
+ rdd: RDD[Map[String, String]],
trainMatrix: String,
params: Map[String, Any],
rabitEnv: Option[java.util.Map[String, String]],
numRounds: Int,
earlyStoppingRound: Int = 0
): RDD[(Array[Byte], Map[String, Array[Float]])] =
- rdds.mapPartitions({ rows=>
+ rdd.mapPartitions({ rows=>
// XGBoost refuses to load our binary format if rabit has been
// initialized, so we do it early. This make the odd situation
// where we need to dispose of them before rabit is shutdown.
diff --git a/jvm/src/test/scala/org/wikimedia/search/mjolnir/DBNSuite.scala
b/jvm/src/test/scala/org/wikimedia/search/mjolnir/DBNSuite.scala
index 2ee6ed6..2a6f7b7 100644
--- a/jvm/src/test/scala/org/wikimedia/search/mjolnir/DBNSuite.scala
+++ b/jvm/src/test/scala/org/wikimedia/search/mjolnir/DBNSuite.scala
@@ -159,9 +159,11 @@
assert(math.abs(sessionEstimate.s.sum - sessionEstimate.s(0) -
sessionEstimate.s(10)) < 0.0001D)
}
+ private val FULL_BENCHMARK = false
+
// Takes ~1.5s on my laptop versus 90 seconds in python
test("basic benchmark") {
- val nQueries = 5000
+ val nQueries = if (FULL_BENCHMARK) 5000 else 100
val nSessionsPerQuery = 20
val nIterations = 40
val nResultsPerQuery = 20
diff --git
a/jvm/src/test/scala/org/wikimedia/search/mjolnir/DataWriterSuite.scala
b/jvm/src/test/scala/org/wikimedia/search/mjolnir/DataWriterSuite.scala
index 4a17e78..ca85260 100644
--- a/jvm/src/test/scala/org/wikimedia/search/mjolnir/DataWriterSuite.scala
+++ b/jvm/src/test/scala/org/wikimedia/search/mjolnir/DataWriterSuite.scala
@@ -26,6 +26,10 @@
.withColumn("features", randVec(5)())
.withColumn("fold", randLong(numFolds)().as("fold", meta))
.withColumn("query", randString(queries)())
+ // Must have more partitions than numFolds above
+ // or coalesce won't do anything.
+ .repartition(numFolds * 2, F.col("query"))
+ .sortWithinPartitions("query")
}
test("Write out various standard fold configs as text files") {
@@ -42,7 +46,7 @@
val pattern = s"$testDir/%s-fold-%s-partition-%d"
val writer = new DataWriter(spark.sparkContext)
val folds = writer.write(df, numWorkers, pattern, foldCol)
- // We only wrote an "all" fold, so top level should have a single item
+
assert(folds.length == expectedFolds)
folds.foreach { fold =>
assert(fold.length == numWorkers)
@@ -52,6 +56,7 @@
// Items expected but not in partition
assert(expectedSplits.diff(partition.keySet).isEmpty)
partition.values.foreach { path =>
+ // Paths should actually exist
assert(Files.exists(Paths.get(path.substring("file:".length))))
}
}
diff --git a/mjolnir/test/conftest.py b/mjolnir/test/conftest.py
index bf2ad8c..0ab729c 100644
--- a/mjolnir/test/conftest.py
+++ b/mjolnir/test/conftest.py
@@ -24,6 +24,27 @@
return os.path.join(cur_dir, 'fixtures')
[email protected]
+def folds_a(fixtures_dir):
+ fixtures_dir = os.path.join(fixtures_dir, 'datasets')
+ return [
+ [{"train": os.path.join(fixtures_dir, "train.xgb"), "test":
os.path.join(fixtures_dir, "test.xgb")}]
+ ]
+
+
[email protected]
+def folds_b(fixtures_dir):
+ fixtures_dir = os.path.join(fixtures_dir, 'datasets')
+
+ def f(path):
+ return os.path.join(fixtures_dir, path + ".xgb")
+
+ return [
+ [{"train": f("train.f0.p0")}, {"train": f("train.f0.p1")}],
+ [{"train": f("train.f1.p0")}, {"train": f("train.f1.p1")}]
+ ]
+
+
@pytest.fixture(scope="session")
def spark_context(request):
"""Fixture for creating a spark context.
@@ -49,8 +70,8 @@
.setAppName("pytest-pyspark-local-testing")
# Maven coordinates of jvm dependencies
.set('spark.jars.packages', ','.join([
- 'ml.dmlc:xgboost4j-spark:0.7-wmf-1',
- 'org.wikimedia.search:mjolnir:0.2',
+ 'ml.dmlc:xgboost4j-spark:0.8-wmf-1',
+ 'org.wikimedia.search:mjolnir:0.3',
'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0']))
# By default spark will shuffle to 200 partitions, which is
# way too many for our small test cases. This cuts execution
diff --git a/mjolnir/test/fixtures/datasets/test.xgb
b/mjolnir/test/fixtures/datasets/test.xgb
new file mode 100644
index 0000000..d3d6afa
--- /dev/null
+++ b/mjolnir/test/fixtures/datasets/test.xgb
@@ -0,0 +1,100 @@
+3 1:0.326636 2:0.077529 3:0.371918 4:0.380291 5:0.014513
+0 1:0.972146 2:0.158156 3:0.914378 4:0.862571 5:0.320560
+1 1:0.298956 2:0.364456 3:0.883224 4:0.396381 5:0.793094
+3 1:0.661035 2:0.051392 3:0.516250 4:0.451712 5:0.944109
+0 1:0.864322 2:0.525315 3:0.786576 4:0.989264 5:0.083457
+3 1:0.404920 2:0.319258 3:0.998247 4:0.337145 5:0.418243
+0 1:0.510521 2:0.187725 3:0.330769 4:0.927407 5:0.335392
+3 1:0.814285 2:0.251424 3:0.506982 4:0.762243 5:0.100247
+3 1:0.117167 2:0.819767 3:0.709812 4:0.796283 5:0.540541
+3 1:0.897631 2:0.580800 3:0.026243 4:0.964434 5:0.231140
+2 1:0.391528 2:0.683021 3:0.997876 4:0.568064 5:0.010284
+2 1:0.454713 2:0.288310 3:0.848797 4:0.748149 5:0.522314
+1 1:0.168367 2:0.309259 3:0.664556 4:0.824906 5:0.370062
+0 1:0.650610 2:0.220118 3:0.321737 4:0.612518 5:0.071387
+1 1:0.411739 2:0.803898 3:0.042743 4:0.842187 5:0.391775
+2 1:0.807920 2:0.979387 3:0.307194 4:0.988092 5:0.529102
+2 1:0.755759 2:0.365076 3:0.885247 4:0.565210 5:0.615610
+0 1:0.254644 2:0.437205 3:0.257438 4:0.279958 5:0.175046
+3 1:0.444070 2:0.378370 3:0.610439 4:0.936986 5:0.287545
+0 1:0.967863 2:0.092285 3:0.987719 4:0.792169 5:0.690630
+2 1:0.455959 2:0.806951 3:0.713977 4:0.546367 5:0.950988
+0 1:0.790699 2:0.858631 3:0.611960 4:0.836762 5:0.386819
+0 1:0.240780 2:0.842705 3:0.766013 4:0.317449 5:0.359710
+1 1:0.783596 2:0.017780 3:0.765471 4:0.225174 5:0.000262
+1 1:0.144028 2:0.529792 3:0.203593 4:0.102651 5:0.774630
+1 1:0.886765 2:0.524375 3:0.555517 4:0.347809 5:0.076116
+3 1:0.094031 2:0.956022 3:0.274482 4:0.054051 5:0.856636
+1 1:0.921116 2:0.567716 3:0.406270 4:0.066023 5:0.419928
+1 1:0.768755 2:0.520684 3:0.138103 4:0.003975 5:0.975453
+2 1:0.936436 2:0.675702 3:0.286579 4:0.053526 5:0.297008
+1 1:0.304488 2:0.246547 3:0.547152 4:0.094481 5:0.680314
+1 1:0.058460 2:0.155417 3:0.608897 4:0.400933 5:0.805939
+0 1:0.537291 2:0.740986 3:0.590492 4:0.608429 5:0.549569
+0 1:0.981997 2:0.140494 3:0.061062 4:0.045810 5:0.401585
+2 1:0.369939 2:0.285637 3:0.853111 4:0.225219 5:0.426097
+3 1:0.908313 2:0.569428 3:0.497288 4:0.491016 5:0.006803
+2 1:0.179035 2:0.642798 3:0.625024 4:0.927981 5:0.405290
+1 1:0.239090 2:0.710597 3:0.179604 4:0.371392 5:0.449195
+1 1:0.579927 2:0.728269 3:0.206559 4:0.424931 5:0.713960
+3 1:0.649605 2:0.731400 3:0.631224 4:0.175438 5:0.304366
+2 1:0.669838 2:0.153425 3:0.771762 4:0.553746 5:0.040279
+0 1:0.436897 2:0.473752 3:0.795377 4:0.820149 5:0.190161
+0 1:0.640871 2:0.516977 3:0.592973 4:0.490025 5:0.577195
+2 1:0.997780 2:0.095394 3:0.651330 4:0.749575 5:0.660816
+3 1:0.152661 2:0.704911 3:0.947403 4:0.870216 5:0.798095
+1 1:0.747997 2:0.075223 3:0.939461 4:0.643620 5:0.914241
+2 1:0.133448 2:0.430715 3:0.789519 4:0.610028 5:0.903299
+0 1:0.401159 2:0.373494 3:0.659331 4:0.294644 5:0.629553
+0 1:0.207960 2:0.361213 3:0.911492 4:0.554985 5:0.493094
+1 1:0.057594 2:0.713257 3:0.446922 4:0.420421 5:0.567548
+1 1:0.887591 2:0.980055 3:0.615915 4:0.974575 5:0.590911
+3 1:0.727584 2:0.967858 3:0.491858 4:0.012659 5:0.283389
+3 1:0.021647 2:0.394032 3:0.764637 4:0.590971 5:0.827838
+3 1:0.620332 2:0.833978 3:0.671682 4:0.236934 5:0.858537
+1 1:0.256581 2:0.496616 3:0.825439 4:0.632813 5:0.164283
+3 1:0.442709 2:0.256347 3:0.231125 4:0.436597 5:0.587120
+2 1:0.464535 2:0.337670 3:0.452877 4:0.892539 5:0.001143
+3 1:0.734347 2:0.180984 3:0.214319 4:0.363408 5:0.644541
+2 1:0.787623 2:0.952571 3:0.030398 4:0.049772 5:0.242738
+3 1:0.953805 2:0.961399 3:0.837528 4:0.898172 5:0.862682
+1 1:0.731495 2:0.847731 3:0.557340 4:0.431582 5:0.900491
+2 1:0.726246 2:0.828104 3:0.761575 4:0.919778 5:0.119979
+2 1:0.934568 2:0.153811 3:0.337450 4:0.940157 5:0.040626
+0 1:0.483256 2:0.741818 3:0.447331 4:0.580388 5:0.378437
+2 1:0.350344 2:0.330684 3:0.712829 4:0.787849 5:0.380805
+3 1:0.962389 2:0.991490 3:0.913410 4:0.158044 5:0.180574
+1 1:0.464680 2:0.328260 3:0.684814 4:0.092428 5:0.477978
+3 1:0.717346 2:0.441867 3:0.444311 4:0.014475 5:0.951867
+1 1:0.829431 2:0.888164 3:0.221868 4:0.171388 5:0.296757
+1 1:0.952573 2:0.183402 3:0.666165 4:0.527787 5:0.412143
+2 1:0.229767 2:0.820025 3:0.615662 4:0.820428 5:0.603999
+3 1:0.982424 2:0.041073 3:0.796715 4:0.015130 5:0.543146
+2 1:0.929637 2:0.825426 3:0.727255 4:0.677794 5:0.093995
+2 1:0.253085 2:0.925220 3:0.464014 4:0.905004 5:0.860812
+3 1:0.551381 2:0.561334 3:0.478454 4:0.881930 5:0.117634
+3 1:0.584691 2:0.779480 3:0.264777 4:0.778328 5:0.787311
+2 1:0.047838 2:0.133641 3:0.556090 4:0.565904 5:0.211877
+2 1:0.557917 2:0.261338 3:0.125646 4:0.258342 5:0.632127
+2 1:0.396906 2:0.526099 3:0.922920 4:0.899431 5:0.437990
+2 1:0.133891 2:0.128417 3:0.841312 4:0.671067 5:0.483595
+3 1:0.710280 2:0.183570 3:0.528577 4:0.940885 5:0.543355
+3 1:0.884920 2:0.375900 3:0.851078 4:0.021749 5:0.808034
+0 1:0.998045 2:0.539014 3:0.476142 4:0.714477 5:0.674686
+1 1:0.051806 2:0.881044 3:0.620053 4:0.324929 5:0.658388
+3 1:0.274004 2:0.305911 3:0.293288 4:0.200069 5:0.898098
+0 1:0.856834 2:0.648173 3:0.159354 4:0.792070 5:0.728352
+2 1:0.123342 2:0.957180 3:0.621960 4:0.480828 5:0.102538
+0 1:0.768011 2:0.718725 3:0.152013 4:0.877257 5:0.184134
+3 1:0.043845 2:0.106492 3:0.343883 4:0.476853 5:0.931994
+0 1:0.840979 2:0.724733 3:0.132602 4:0.660006 5:0.172654
+1 1:0.790630 2:0.387497 3:0.146874 4:0.272060 5:0.230415
+0 1:0.483095 2:0.769878 3:0.038515 4:0.773712 5:0.406844
+2 1:0.611834 2:0.054520 3:0.097503 4:0.852499 5:0.350596
+1 1:0.319831 2:0.741203 3:0.445348 4:0.305239 5:0.076114
+1 1:0.064030 2:0.230787 3:0.716318 4:0.777473 5:0.473709
+0 1:0.030735 2:0.086434 3:0.230371 4:0.166334 5:0.729940
+0 1:0.443625 2:0.148763 3:0.777603 4:0.469975 5:0.918963
+1 1:0.011012 2:0.474785 3:0.837294 4:0.976318 5:0.813301
+3 1:0.143489 2:0.537931 3:0.422035 4:0.032870 5:0.770511
+3 1:0.531663 2:0.379978 3:0.993619 4:0.126227 5:0.036455
diff --git a/mjolnir/test/fixtures/datasets/test.xgb.query
b/mjolnir/test/fixtures/datasets/test.xgb.query
new file mode 100644
index 0000000..ea19cda
--- /dev/null
+++ b/mjolnir/test/fixtures/datasets/test.xgb.query
@@ -0,0 +1,7 @@
+16
+12
+12
+20
+12
+16
+12
diff --git a/mjolnir/test/fixtures/datasets/train.xgb
b/mjolnir/test/fixtures/datasets/train.xgb
new file mode 100644
index 0000000..c079891
--- /dev/null
+++ b/mjolnir/test/fixtures/datasets/train.xgb
@@ -0,0 +1,100 @@
+2 1:0.276857 2:0.309246 3:0.443840 4:0.323302 5:0.251910
+0 1:0.696140 2:0.637534 3:0.229497 4:0.280137 5:0.617490
+2 1:0.413772 2:0.395534 3:0.429219 4:0.603654 5:0.517427
+0 1:0.661208 2:0.568080 3:0.704676 4:0.597076 5:0.923187
+0 1:0.448723 2:0.282579 3:0.554448 4:0.515220 5:0.520206
+0 1:0.690696 2:0.517868 3:0.721864 4:0.683105 5:0.645097
+0 1:0.028015 2:0.758854 3:0.510872 4:0.041371 5:0.938664
+2 1:0.628253 2:0.164138 3:0.174485 4:0.660370 5:0.661200
+3 1:0.156874 2:0.244076 3:0.834069 4:0.816389 5:0.267347
+0 1:0.598002 2:0.961438 3:0.115683 4:0.176997 5:0.802739
+0 1:0.210183 2:0.094221 3:0.315377 4:0.104599 5:0.078312
+2 1:0.155581 2:0.239757 3:0.390496 4:0.448591 5:0.363550
+0 1:0.842447 2:0.449796 3:0.699602 4:0.775102 5:0.551474
+1 1:0.282622 2:0.493173 3:0.308711 4:0.588665 5:0.015217
+1 1:0.849260 2:0.716784 3:0.801057 4:0.702118 5:0.467375
+1 1:0.318216 2:0.502648 3:0.139761 4:0.320343 5:0.677026
+1 1:0.496001 2:0.137648 3:0.962708 4:0.638394 5:0.922043
+0 1:0.113989 2:0.482813 3:0.614473 4:0.398461 5:0.411334
+3 1:0.910096 2:0.742803 3:0.802974 4:0.242235 5:0.741078
+3 1:0.026262 2:0.153817 3:0.866324 4:0.152764 5:0.079376
+2 1:0.866166 2:0.176590 3:0.501546 4:0.550038 5:0.252295
+2 1:0.635001 2:0.421436 3:0.400276 4:0.960437 5:0.958324
+0 1:0.511912 2:0.465321 3:0.391040 4:0.536889 5:0.262131
+3 1:0.061608 2:0.844460 3:0.775436 4:0.940628 5:0.472095
+2 1:0.968002 2:0.142278 3:0.746172 4:0.167117 5:0.888660
+1 1:0.264066 2:0.675206 3:0.761167 4:0.605750 5:0.458382
+3 1:0.848002 2:0.323477 3:0.985868 4:0.032662 5:0.831382
+2 1:0.173251 2:0.098697 3:0.002584 4:0.172282 5:0.467190
+3 1:0.464708 2:0.120618 3:0.178631 4:0.107419 5:0.458566
+3 1:0.085904 2:0.100485 3:0.439993 4:0.253806 5:0.225162
+3 1:0.309424 2:0.219766 3:0.979150 4:0.992466 5:0.781002
+2 1:0.978544 2:0.831527 3:0.924340 4:0.472260 5:0.854699
+1 1:0.838158 2:0.539927 3:0.033527 4:0.939022 5:0.858939
+0 1:0.478958 2:0.035299 3:0.321513 4:0.138026 5:0.344530
+1 1:0.420183 2:0.050261 3:0.168622 4:0.987962 5:0.081127
+3 1:0.529119 2:0.756857 3:0.525388 4:0.906647 5:0.015031
+2 1:0.757490 2:0.697273 3:0.672960 4:0.038027 5:0.567640
+1 1:0.877515 2:0.997738 3:0.140903 4:0.022510 5:0.275767
+1 1:0.707713 2:0.767267 3:0.146578 4:0.177046 5:0.390119
+2 1:0.126545 2:0.338924 3:0.290230 4:0.140717 5:0.193812
+1 1:0.438441 2:0.413887 3:0.715906 4:0.718078 5:0.037032
+1 1:0.394099 2:0.898491 3:0.851562 4:0.854452 5:0.200452
+3 1:0.554398 2:0.927987 3:0.506810 4:0.361222 5:0.339552
+1 1:0.651350 2:0.642931 3:0.992078 4:0.394123 5:0.356481
+2 1:0.323378 2:0.726388 3:0.278780 4:0.873152 5:0.760571
+2 1:0.134356 2:0.635057 3:0.085017 4:0.454603 5:0.756119
+2 1:0.980191 2:0.788963 3:0.817764 4:0.653687 5:0.556327
+0 1:0.008553 2:0.637250 3:0.401720 4:0.094888 5:0.483444
+3 1:0.422236 2:0.459310 3:0.469951 4:0.514213 5:0.806955
+2 1:0.794112 2:0.424444 3:0.341827 4:0.791480 5:0.355004
+1 1:0.650646 2:0.831209 3:0.360482 4:0.730525 5:0.253130
+0 1:0.899328 2:0.843065 3:0.668982 4:0.386870 5:0.767945
+3 1:0.459478 2:0.172103 3:0.115004 4:0.861834 5:0.497636
+3 1:0.502739 2:0.717756 3:0.324993 4:0.854486 5:0.691303
+3 1:0.694347 2:0.975218 3:0.749809 4:0.080061 5:0.216270
+0 1:0.204749 2:0.152912 3:0.364078 4:0.440324 5:0.064159
+0 1:0.731656 2:0.371814 3:0.359377 4:0.176542 5:0.820773
+0 1:0.247573 2:0.992306 3:0.796591 4:0.586521 5:0.681643
+0 1:0.057699 2:0.006356 3:0.445668 4:0.924739 5:0.316846
+2 1:0.542624 2:0.136532 3:0.997968 4:0.995610 5:0.035732
+1 1:0.394307 2:0.992210 3:0.872835 4:0.150469 5:0.909376
+2 1:0.857816 2:0.033190 3:0.880868 4:0.565811 5:0.665156
+1 1:0.942718 2:0.660032 3:0.178236 4:0.182812 5:0.087367
+2 1:0.831711 2:0.563728 3:0.384249 4:0.217499 5:0.294353
+0 1:0.876942 2:0.456118 3:0.673524 4:0.541833 5:0.584563
+3 1:0.576608 2:0.133179 3:0.125881 4:0.989455 5:0.918416
+1 1:0.750385 2:0.262965 3:0.308398 4:0.490595 5:0.058171
+2 1:0.650555 2:0.260526 3:0.103875 4:0.015988 5:0.965965
+0 1:0.256881 2:0.816037 3:0.178294 4:0.502192 5:0.982004
+2 1:0.197509 2:0.074589 3:0.638905 4:0.277399 5:0.396911
+2 1:0.354089 2:0.236236 3:0.093984 4:0.792231 5:0.763705
+1 1:0.616673 2:0.861175 3:0.073603 4:0.534483 5:0.935103
+3 1:0.251242 2:0.234976 3:0.922202 4:0.126030 5:0.462835
+1 1:0.120315 2:0.277710 3:0.911577 4:0.730788 5:0.657184
+2 1:0.106240 2:0.919003 3:0.650803 4:0.052300 5:0.717360
+2 1:0.047706 2:0.206002 3:0.439120 4:0.411402 5:0.153675
+2 1:0.573659 2:0.866770 3:0.863335 4:0.203366 5:0.234077
+1 1:0.035194 2:0.398743 3:0.510657 4:0.028964 5:0.619880
+3 1:0.729706 2:0.133357 3:0.864867 4:0.806695 5:0.681677
+3 1:0.745570 2:0.384475 3:0.758124 4:0.269669 5:0.134004
+2 1:0.625335 2:0.962249 3:0.365400 4:0.905473 5:0.574668
+0 1:0.127824 2:0.574420 3:0.925172 4:0.861584 5:0.651584
+0 1:0.283864 2:0.524622 3:0.822224 4:0.572662 5:0.054927
+0 1:0.634046 2:0.067980 3:0.154102 4:0.392269 5:0.630872
+0 1:0.715898 2:0.914851 3:0.767055 4:0.929095 5:0.545982
+1 1:0.265327 2:0.815794 3:0.787845 4:0.316523 5:0.931774
+3 1:0.989390 2:0.000113 3:0.033949 4:0.230697 5:0.298948
+1 1:0.894869 2:0.368693 3:0.138898 4:0.941772 5:0.320418
+0 1:0.230322 2:0.525224 3:0.071963 4:0.762348 5:0.378543
+1 1:0.615923 2:0.670154 3:0.398374 4:0.978306 5:0.445780
+2 1:0.225093 2:0.960227 3:0.620164 4:0.878473 5:0.018904
+0 1:0.527845 2:0.985475 3:0.661959 4:0.950368 5:0.760041
+3 1:0.153984 2:0.333098 3:0.228054 4:0.466053 5:0.466788
+1 1:0.721555 2:0.533151 3:0.648141 4:0.394710 5:0.959431
+2 1:0.838903 2:0.029601 3:0.941272 4:0.642579 5:0.999925
+3 1:0.704906 2:0.658367 3:0.698945 4:0.545432 5:0.448200
+1 1:0.244696 2:0.192652 3:0.219868 4:0.399418 5:0.217660
+1 1:0.834653 2:0.236075 3:0.051846 4:0.215874 5:0.286917
+0 1:0.391280 2:0.895263 3:0.587608 4:0.880774 5:0.779338
+2 1:0.215979 2:0.890866 3:0.158514 4:0.359504 5:0.023049
diff --git a/mjolnir/test/fixtures/datasets/train.xgb.query
b/mjolnir/test/fixtures/datasets/train.xgb.query
new file mode 100644
index 0000000..52d733e
--- /dev/null
+++ b/mjolnir/test/fixtures/datasets/train.xgb.query
@@ -0,0 +1,7 @@
+18
+14
+14
+12
+16
+16
+10
diff --git a/mjolnir/test/fixtures/load_config/example_train.expect
b/mjolnir/test/fixtures/load_config/example_train.expect
index 52bea1b..3199df6 100644
--- a/mjolnir/test/fixtures/load_config/example_train.expect
+++ b/mjolnir/test/fixtures/load_config/example_train.expect
@@ -31,12 +31,51 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/spark-submit
spark_conf:
spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet
-Dhttp.proxyPort=8080
-Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
+ spark.task.cpus: '1'
+ spark.yarn.executor.memoryOverhead: '512'
+ make_folds:
+ cmd_args:
+ input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
+ max-executors: '50'
+ num-folds: '5'
+ num-workers: '1'
+ output-dir:
hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_global
+ environment:
+ HOME: /home/pytest
+ PYSPARK_PYTHON: venv/bin/python
+ SPARK_CONF_DIR: /etc/spark2/conf
+ SPARK_HOME: /usr/lib/spark2
+ USER: pytest
+ mjolnir_utility: make_folds
+ mjolnir_utility_path: /srv/mjolnir/venv/bin/mjolnir-utilities.py
+ paths:
+ dir_exist: !!set
+ /etc/spark2/conf: null
+ file_exist: !!set
+ /srv/mjolnir/mjolnir_venv.zip: null
+ /srv/mjolnir/venv/bin/mjolnir-utilities.py: null
+ /usr/lib/spark2/bin/spark-submit: null
+ venv/bin/python: null
+ spark_args:
+ archives: /srv/mjolnir/mjolnir_venv.zip#venv
+ executor-cores: '1'
+ executor-memory: 2G
+ files: /usr/lib/libhdfs.so.0.0.0
+ master: yarn
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
+ spark_command: /usr/lib/spark2/bin/spark-submit
+ spark_conf:
+ spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet
-Dhttp.proxyPort=8080
+ -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
+ spark.dynamicAllocation.maxExecutors: '100'
+ spark.locality.wait: '0'
spark.task.cpus: '1'
spark.yarn.executor.memoryOverhead: '512'
pyspark:
@@ -60,7 +99,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/pyspark
spark_conf:
@@ -89,7 +128,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/pyspark
spark_conf:
@@ -99,8 +138,8 @@
spark.yarn.executor.memoryOverhead: '6144'
training_pipeline:
cmd_args:
- input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
- output: /home/pytest/training_size/marker_global
+ input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_global
+ output: /home/pytest/training_size/marker-folds_global
environment:
HOME: /home/pytest
PYSPARK_PYTHON: venv/bin/python
@@ -125,14 +164,13 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/spark-submit
spark_conf:
spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet
-Dhttp.proxyPort=8080
-Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
spark.dynamicAllocation.executorIdleTimeout: 180s
- spark.sql.autoBroadcastJoinThreshold: '-1'
spark.task.cpus: '1'
spark.yarn.executor.memoryOverhead: '512'
wikis: []
@@ -170,7 +208,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/spark-submit
spark_conf:
@@ -178,6 +216,45 @@
-Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
spark.task.cpus: '1'
spark.yarn.executor.memoryOverhead: '512'
+ make_folds:
+ cmd_args:
+ input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
+ max-executors: '50'
+ num-folds: '3'
+ num-workers: '1'
+ output-dir:
hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_large
+ environment:
+ HOME: /home/pytest
+ PYSPARK_PYTHON: venv/bin/python
+ SPARK_CONF_DIR: /etc/spark2/conf
+ SPARK_HOME: /usr/lib/spark2
+ USER: pytest
+ mjolnir_utility: make_folds
+ mjolnir_utility_path: /srv/mjolnir/venv/bin/mjolnir-utilities.py
+ paths:
+ dir_exist: !!set
+ /etc/spark2/conf: null
+ file_exist: !!set
+ /srv/mjolnir/mjolnir_venv.zip: null
+ /srv/mjolnir/venv/bin/mjolnir-utilities.py: null
+ /usr/lib/spark2/bin/spark-submit: null
+ venv/bin/python: null
+ spark_args:
+ archives: /srv/mjolnir/mjolnir_venv.zip#venv
+ executor-cores: '1'
+ executor-memory: 2G
+ files: /usr/lib/libhdfs.so.0.0.0
+ master: yarn
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
+ spark_command: /usr/lib/spark2/bin/spark-submit
+ spark_conf:
+ spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet
-Dhttp.proxyPort=8080
+ -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
+ spark.dynamicAllocation.maxExecutors: '100'
+ spark.locality.wait: '0'
+ spark.task.cpus: '1'
+ spark.yarn.executor.memoryOverhead: '5120'
pyspark:
environment:
HOME: /home/pytest
@@ -199,7 +276,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/pyspark
spark_conf:
@@ -228,7 +305,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/pyspark
spark_conf:
@@ -238,12 +315,10 @@
spark.yarn.executor.memoryOverhead: '6144'
training_pipeline:
cmd_args:
- cv-jobs: '22'
+ cv-jobs: '80'
final-trees: '100'
- folds: '3'
- input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
- output: /home/pytest/training_size/marker_large
- workers: '3'
+ input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_large
+ output: /home/pytest/training_size/marker-folds_large
environment:
HOME: /home/pytest
PYSPARK_PYTHON: venv/bin/python
@@ -265,20 +340,19 @@
archives: /srv/mjolnir/mjolnir_venv.zip#venv
driver-memory: 3G
executor-cores: '6'
- executor-memory: 4G
+ executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/spark-submit
spark_conf:
spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet
-Dhttp.proxyPort=8080
-Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
spark.dynamicAllocation.executorIdleTimeout: 180s
- spark.dynamicAllocation.maxExecutors: '65'
- spark.sql.autoBroadcastJoinThreshold: '-1'
+ spark.dynamicAllocation.maxExecutors: '85'
spark.task.cpus: '6'
- spark.yarn.executor.memoryOverhead: '9216'
+ spark.yarn.executor.memoryOverhead: '6144'
wikis:
- enwiki
- dewiki
@@ -315,7 +389,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/spark-submit
spark_conf:
@@ -323,6 +397,45 @@
-Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
spark.task.cpus: '1'
spark.yarn.executor.memoryOverhead: '512'
+ make_folds:
+ cmd_args:
+ input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
+ max-executors: '50'
+ num-folds: '5'
+ num-workers: '1'
+ output-dir:
hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_medium
+ environment:
+ HOME: /home/pytest
+ PYSPARK_PYTHON: venv/bin/python
+ SPARK_CONF_DIR: /etc/spark2/conf
+ SPARK_HOME: /usr/lib/spark2
+ USER: pytest
+ mjolnir_utility: make_folds
+ mjolnir_utility_path: /srv/mjolnir/venv/bin/mjolnir-utilities.py
+ paths:
+ dir_exist: !!set
+ /etc/spark2/conf: null
+ file_exist: !!set
+ /srv/mjolnir/mjolnir_venv.zip: null
+ /srv/mjolnir/venv/bin/mjolnir-utilities.py: null
+ /usr/lib/spark2/bin/spark-submit: null
+ venv/bin/python: null
+ spark_args:
+ archives: /srv/mjolnir/mjolnir_venv.zip#venv
+ executor-cores: '1'
+ executor-memory: 2G
+ files: /usr/lib/libhdfs.so.0.0.0
+ master: yarn
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
+ spark_command: /usr/lib/spark2/bin/spark-submit
+ spark_conf:
+ spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet
-Dhttp.proxyPort=8080
+ -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
+ spark.dynamicAllocation.maxExecutors: '100'
+ spark.locality.wait: '0'
+ spark.task.cpus: '1'
+ spark.yarn.executor.memoryOverhead: '1536'
pyspark:
environment:
HOME: /home/pytest
@@ -344,7 +457,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/pyspark
spark_conf:
@@ -373,7 +486,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/pyspark
spark_conf:
@@ -383,12 +496,10 @@
spark.yarn.executor.memoryOverhead: '6144'
training_pipeline:
cmd_args:
- cv-jobs: '70'
+ cv-jobs: '125'
final-trees: '100'
- folds: '5'
- input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
- output: /home/pytest/training_size/marker_medium
- workers: '1'
+ input:
hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_medium
+ output: /home/pytest/training_size/marker-folds_medium
environment:
HOME: /home/pytest
PYSPARK_PYTHON: venv/bin/python
@@ -409,21 +520,20 @@
spark_args:
archives: /srv/mjolnir/mjolnir_venv.zip#venv
driver-memory: 3G
- executor-cores: '6'
- executor-memory: 3G
+ executor-cores: '4'
+ executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/spark-submit
spark_conf:
spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet
-Dhttp.proxyPort=8080
-Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
spark.dynamicAllocation.executorIdleTimeout: 180s
- spark.dynamicAllocation.maxExecutors: '75'
- spark.sql.autoBroadcastJoinThreshold: '-1'
- spark.task.cpus: '6'
- spark.yarn.executor.memoryOverhead: '9216'
+ spark.dynamicAllocation.maxExecutors: '130'
+ spark.task.cpus: '4'
+ spark.yarn.executor.memoryOverhead: '2048'
wikis:
- itwiki
- ptwiki
@@ -462,7 +572,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/spark-submit
spark_conf:
@@ -470,6 +580,45 @@
-Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
spark.task.cpus: '1'
spark.yarn.executor.memoryOverhead: '512'
+ make_folds:
+ cmd_args:
+ input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
+ max-executors: '50'
+ num-folds: '5'
+ num-workers: '1'
+ output-dir:
hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_small
+ environment:
+ HOME: /home/pytest
+ PYSPARK_PYTHON: venv/bin/python
+ SPARK_CONF_DIR: /etc/spark2/conf
+ SPARK_HOME: /usr/lib/spark2
+ USER: pytest
+ mjolnir_utility: make_folds
+ mjolnir_utility_path: /srv/mjolnir/venv/bin/mjolnir-utilities.py
+ paths:
+ dir_exist: !!set
+ /etc/spark2/conf: null
+ file_exist: !!set
+ /srv/mjolnir/mjolnir_venv.zip: null
+ /srv/mjolnir/venv/bin/mjolnir-utilities.py: null
+ /usr/lib/spark2/bin/spark-submit: null
+ venv/bin/python: null
+ spark_args:
+ archives: /srv/mjolnir/mjolnir_venv.zip#venv
+ executor-cores: '1'
+ executor-memory: 2G
+ files: /usr/lib/libhdfs.so.0.0.0
+ master: yarn
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
+ spark_command: /usr/lib/spark2/bin/spark-submit
+ spark_conf:
+ spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet
-Dhttp.proxyPort=8080
+ -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
+ spark.dynamicAllocation.maxExecutors: '100'
+ spark.locality.wait: '0'
+ spark.task.cpus: '1'
+ spark.yarn.executor.memoryOverhead: '1024'
pyspark:
environment:
HOME: /home/pytest
@@ -491,7 +640,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/pyspark
spark_conf:
@@ -520,7 +669,7 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/pyspark
spark_conf:
@@ -530,12 +679,10 @@
spark.yarn.executor.memoryOverhead: '6144'
training_pipeline:
cmd_args:
- cv-jobs: '100'
+ cv-jobs: '125'
final-trees: '500'
- folds: '5'
- input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
- output: /home/pytest/training_size/marker_small
- workers: '1'
+ input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker-folds_small
+ output: /home/pytest/training_size/marker-folds_small
environment:
HOME: /home/pytest
PYSPARK_PYTHON: venv/bin/python
@@ -560,17 +707,16 @@
executor-memory: 2G
files: /usr/lib/libhdfs.so.0.0.0
master: yarn
- packages:
ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
+ packages:
ml.dmlc:xgboost4j-spark:0.8-wmf-1,org.wikimedia.search:mjolnir:0.3,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2
repositories:
https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored
spark_command: /usr/lib/spark2/bin/spark-submit
spark_conf:
spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet
-Dhttp.proxyPort=8080
-Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080
spark.dynamicAllocation.executorIdleTimeout: 180s
- spark.dynamicAllocation.maxExecutors: '105'
- spark.sql.autoBroadcastJoinThreshold: '-1'
+ spark.dynamicAllocation.maxExecutors: '130'
spark.task.cpus: '4'
- spark.yarn.executor.memoryOverhead: '6144'
+ spark.yarn.executor.memoryOverhead: '1024'
wikis:
- svwiki
- fawiki
@@ -582,7 +728,6 @@
- fiwiki
- jawiki
- arwiki
- - itwiki
- nlwiki
- zhwiki
- plwiki
diff --git a/mjolnir/test/test_utils.py b/mjolnir/test/test_utils.py
new file mode 100644
index 0000000..69fecab
--- /dev/null
+++ b/mjolnir/test/test_utils.py
@@ -0,0 +1,69 @@
+from contextlib import contextmanager
+from copy import deepcopy
+import mjolnir.utils
+import pytest
+
+
+class FailTestException(Exception):
+ pass
+
+
+def make_multi_manager(pre_raise, post_raise, transitions):
+ pre_raise = list(pre_raise)
+ post_raise = list(post_raise)
+
+ @contextmanager
+ def f(data):
+ if pre_raise.pop(0):
+ transitions.append(('pre-raise', data))
+ raise FailTestException('pre-raise: ' + str(data))
+ transitions.append(('begin', data))
+ yield data
+ transitions.append(('end', data))
+ if post_raise.pop(0):
+ transitions.append(('post raise', data))
+ raise FailTestException('post raise: ' + str(data))
+
+ return mjolnir.utils.multi_with(f)
+
+
+def generate_fixtures():
+ tests = []
+ n = 3
+
+ success = [False] * n
+ expect_success = [
+ success,
+ success,
+ [('begin', i) for i in range(n)] +
+ [('inside', None)] +
+ [('end', i) for i in range(n)]]
+
+ tests.append(expect_success)
+
+ expect_failure = deepcopy(expect_success)
+ expect_failure[0][1] = True
+ expect_failure[2] = [('begin', 0), ('pre-raise', 1), ('end', 0)]
+ tests.append(expect_failure)
+
+ expect_failure = deepcopy(expect_success)
+ expect_failure[0][n - 1] = True
+ expect_failure[2] = [('begin', i) for i in range(n - 1)] + \
+ [('pre-raise', n - 1)] + \
+ [('end', i) for i in range(n-1)]
+ tests.append(expect_failure)
+
+ return ('pre_raise,post_raise,expected', tests)
+
+
[email protected](*generate_fixtures())
+def test_multi_with_cleanup(pre_raise, post_raise, expected):
+ n = 3
+ transitions = []
+ g = make_multi_manager(pre_raise, post_raise, transitions)
+ try:
+ with g(list(range(n))):
+ transitions.append(('inside', None))
+ except FailTestException:
+ pass
+ assert expected == transitions
diff --git a/mjolnir/test/training/test_hyperopt.py
b/mjolnir/test/training/test_hyperopt.py
index 6ef6f8a..c4c4782 100644
--- a/mjolnir/test/training/test_hyperopt.py
+++ b/mjolnir/test/training/test_hyperopt.py
@@ -20,7 +20,7 @@
).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
-def test_minimize(df_train):
+def test_minimize(folds_b):
"Not an amazing test...basically sees if the happy path doesnt blow up"
space = {
'num_rounds': 50,
@@ -31,8 +31,7 @@
# xgboost is separately tested. Instead of going all the way into xgboost
# mock it out w/MockModel.
best_params, trails = mjolnir.training.hyperopt.minimize(
- df_train, MockModel, space, max_evals=5, num_folds=2,
- num_workers=1)
+ folds_b, MockModel, space, max_evals=5)
assert isinstance(best_params, dict)
# num_rounds should have been unchanged
assert 'num_rounds' in best_params
@@ -41,13 +40,23 @@
assert len(trails.trials) == 5
+class MockSummary(object):
+ def train(self):
+ return [1.]
+
+ def test(self):
+ return [1.]
+
+
class MockModel(object):
- def __init__(self, df, params, num_workers):
+ def __init__(self, df, params, train_matrix=None):
# Params that were passed to hyperopt
assert isinstance(params, dict)
assert 'max_depth' in params
assert params['num_rounds'] == 50
- assert num_workers == 1
def eval(self, df_test, j_groups=None, feature_col='features',
label_col='label'):
return 1.0
+
+ def summary(self):
+ return MockSummary()
diff --git a/mjolnir/test/training/test_tuning.py
b/mjolnir/test/training/test_tuning.py
index eaa35b6..e14982b 100644
--- a/mjolnir/test/training/test_tuning.py
+++ b/mjolnir/test/training/test_tuning.py
@@ -47,14 +47,12 @@
).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
-def test_cross_validate_plain_df(df_train):
+def test_cross_validate_plain_df(folds_a):
scores = mjolnir.training.tuning.cross_validate(
- df_train,
+ folds_a,
mjolnir.training.xgboost.train,
{'objective': 'rank:ndcg', 'eval_metric': 'ndcg@3', 'num_rounds': 1},
- # xgboost needs all jobs to have a worker assigned before it will
- # finish a round of training, so we have to be careful not to use
- # too many workers
- num_folds=2, num_workers=1, pool=None)
+ pool=None)
# one score for each fold
- assert len(scores) == 2
+ for fold, score in zip(folds_a, scores):
+ assert fold[0].keys() == score.keys()
diff --git a/mjolnir/test/training/test_xgboost.py
b/mjolnir/test/training/test_xgboost.py
index c4642d0..e49e496 100644
--- a/mjolnir/test/training/test_xgboost.py
+++ b/mjolnir/test/training/test_xgboost.py
@@ -57,6 +57,30 @@
@pytest.fixture()
+def train_path(fixtures_dir):
+ """
+ Fake data generated with:
+
+ data = [(random.randint(0,3), [random.random() for i in range(5)]) for i
in range(100)]
+ with open('train.xgb', 'w') as f:
+ for label, features in data:
+ f.write("%d %s\n" % (label, " ".join(["%d:%f" % (i+1, feat) for i,
feat in enumerate(features)])))
+ with open('train.xgb.query', 'w') as f:
+ i = 0
+ while i < len(data):
+ next = min(len(data) - i, random.randint(10,20))
+ f.write("%d\n" % (next))
+ i += next
+ """
+ return fixtures_dir + '/datasets/train.xgb'
+
+
[email protected]()
+def test_path(fixtures_dir):
+ return fixtures_dir + '/datasets/test.xgb'
+
+
[email protected]()
def df_train(spark_context, hive_context):
rdd1 = spark_context.parallelize([
('foowiki', 'foo', 2, Vectors.dense([2.2])),
@@ -73,11 +97,11 @@
.toDF(['wikiid', 'query', 'label', 'features']))
-def test_train_minimum_params(df_train):
+def test_train_minimum_params(df_train, folds_a):
params = {'num_rounds': 1}
# TODO: Anything > 1 worker can easily get stuck, as a system
- # with 2 cpus will only spawn a single worker.
- model = mjolnir.training.xgboost.train(df_train, params, num_workers=1)
+ # with 2 cpus will only spawn a single executor.
+ model = mjolnir.training.xgboost.train(folds_a[0], params, 'train')
# What else can we practically assert?
df_transformed = model.transform(df_train)
@@ -86,25 +110,3 @@
# make sure train didn't clobber the incoming params
assert params['num_rounds'] == 1
-
-
-def _always_raise(*args, **kwargs):
- raise ValueError('should not be called')
-
-
-def test_train_pre_prepped(df_train):
- num_workers = 1
- params = {'num_rounds': 1}
-
- df_grouped, j_groups = mjolnir.training.xgboost.prep_training(
- df_train, num_workers)
- params['groupData'] = j_groups
-
- # TODO: This is probably not how we should make sure it isn't called..
- orig_prep_training = mjolnir.training.xgboost.prep_training
- try:
- mjolnir.training.xgboost.prep_training = _always_raise
- model = mjolnir.training.xgboost.train(df_grouped, params)
- assert 0.74 == pytest.approx(model.eval(df_grouped, j_groups),
abs=0.01)
- finally:
- mjolnir.training.xgboost.prep_training = orig_prep_training
diff --git a/mjolnir/training/hyperopt.py b/mjolnir/training/hyperopt.py
index b4001eb..9976bb3 100644
--- a/mjolnir/training/hyperopt.py
+++ b/mjolnir/training/hyperopt.py
@@ -80,13 +80,13 @@
return state
-def minimize(df, train_func, space, max_evals=50, algo=hyperopt.tpe.suggest,
- num_folds=5, num_workers=5, cv_pool=None, trials_pool=None):
+def minimize(folds, train_func, space, max_evals=50, algo=hyperopt.tpe.suggest,
+ cv_pool=None, trials_pool=None):
"""Perform cross validated hyperparameter optimization of train_func
Parameters
----------
- df : pyspark.sql.DataFrame
+ folds : list of dict each with two keys, train and test
Features and Labels to optimize over
train_func : callable
Function to use for training individual models
@@ -97,10 +97,6 @@
algo : callable
The algorithm to use with hyperopt. See docs of hyperopt.fmin for more
details.
- num_folds : int
- Number of folds to split df into for cross validation
- num_workers : int
- Number of executors to use for each model training
cv_pool : multiprocessing.dummy.Pool or None
Controls the number of models to run in parallel. If None models
are trained sequentially.
@@ -117,13 +113,13 @@
"""
def objective(params):
- scores = mjolnir.training.tuning._cross_validate(
- folds, train_func, params, num_workers=num_workers,
- pool=cv_pool)
+ scores = mjolnir.training.tuning.cross_validate(
+ folds, train_func, params, pool=cv_pool)
# For now the only metric is NDCG, and hyperopt is a minimizer
- # so return the negative NDCG
- loss = [-s['test'] for s in scores]
- true_loss = [s['train'] - s['test'] for s in scores]
+ # so return the negative NDCG. Also makes the bold assumption
+ # we had at least two pieces of the fold named 'test' and 'train'
+ loss = [-s['test'][-1] for s in scores]
+ true_loss = [s['train'][-1] - s['test'][-1] for s in scores]
num_failures = sum([math.isnan(s) for s in loss])
if num_failures > 1:
return {
@@ -141,21 +137,13 @@
'true_loss_variance': np.var(true_loss),
}
- folds = mjolnir.training.tuning._make_folds(
- df, num_folds=num_folds, num_workers=num_workers, pool=cv_pool)
+ if trials_pool is None:
+ trials = hyperopt.Trials()
+ else:
+ trials = ThreadingTrials(trials_pool)
- try:
- if trials_pool is None:
- trials = hyperopt.Trials()
- else:
- trials = ThreadingTrials(trials_pool)
- best = hyperopt.fmin(objective, space, algo=algo,
- max_evals=max_evals, trials=trials)
- finally:
- for fold in folds:
- fold['train'].unpersist()
- fold['test'].unpersist()
-
+ best = hyperopt.fmin(objective, space, algo=algo,
+ max_evals=max_evals, trials=trials)
# hyperopt only returns the non-constant parameters in best. It seems
# more convenient to return all of them.
best_merged = space.copy()
diff --git a/mjolnir/training/tuning.py b/mjolnir/training/tuning.py
index d76fd1b..f106f88 100644
--- a/mjolnir/training/tuning.py
+++ b/mjolnir/training/tuning.py
@@ -93,6 +93,7 @@
----------
df : pyspark.sql.DataFrame
num_folds : int
+ output_column : str, optional
Yields
------
@@ -103,71 +104,6 @@
.withColumn(output_column, mjolnir.spark.add_meta(df._sc,
F.col(output_column), {
'num_folds': num_folds,
})))
-
-
-def _make_folds(df, num_folds, num_workers, pool):
- """Transform a DataFrame with assigned folds into many dataframes.
-
- The results of split and group_k_fold emit a single dataframe with folds
- marked on the individual rows. To do the resulting training we need
individual
- dataframes for each test/train split within the folds. If the data has
- not already had folds assigned they will be assigned based on the
'num_folds'
- key in params. If not present 5 folds will be used.
-
- Also generates the appropriate group data for xgboost. This doesn't
- necessarily belong here, but it is relatively expensive to calculate, so we
- benefit significantly by doing it once before hyperparameter tuning, as
- opposed to doing it for each iteration.
-
- Parameters
- ----------
- df : pyspark.sql.DataFrame
- Input dataframe with a 'fold' column indicating which fold each row
- belongs to.
- num_folds : int
- Number of folds to create. If a 'fold' column already exists in df
- this will be ignored.
- num_workers : int
- Number of workers used to train each model. This is passed onto
- xgboost.prep_training to prepare each fold.
- pool : multiprocessing.dummy.Pool or None
- Used to prepare folds in parallel. If not provided folds will be
- generated sequentially.
-
- Returns
- -------
- list
- Generates a list of dicts, one for each fold. Each dict contains
- train and test keys containing DataFrames, along with j_train_groups
- and j_test_groups keys which contain py4j JavaObject instances
- corresponding to group data needed by xgboost for train/eval. The train
- and test dataframes have been persisted, so should be unpersisted
- when no longer needed.
- """
- if 'fold' in df.columns:
- assert num_folds == df.schema['fold'].metadata['num_folds']
- df_folds = df
- else:
- df_folds = group_k_fold(df, num_folds)
-
- def job(fold):
- condition = F.col('fold') == fold
- # TODO: de-couple xgboost from cv generation.
- df_train, j_train_groups = mjolnir.training.xgboost.prep_training(
- df_folds.where(~condition), num_workers)
- df_test, j_test_groups = mjolnir.training.xgboost.prep_training(
- df_folds.where(condition), num_workers)
- return {
- 'train': df_train,
- 'test': df_test,
- 'j_train_groups': j_train_groups,
- 'j_test_groups': j_test_groups
- }
-
- if pool is None:
- return map(job, range(num_folds))
- else:
- return pool.map(job, range(num_folds))
def _py4j_retry(fn, default_retval):
@@ -190,15 +126,14 @@
return with_retry
-def _cross_validate(folds, train_func, params, num_workers, pool):
+def cross_validate(folds, train_func, params, pool):
"""Perform cross validation of the provided folds
Parameters
----------
- folds : list
+ folds : list of dict containing train and test keys
train_func : callable
params : dict
- num_workers : int
pool : multiprocessing.dummy.Pool or None
Returns
@@ -206,55 +141,19 @@
list
"""
def job(fold):
- local_params = params.copy()
- local_params['groupData'] = fold['j_train_groups']
- model = train_func(fold['train'], local_params,
num_workers=num_workers)
+ model = train_func(fold, params)
+ # TODO: Summary is hardcodeed to train/test
return {
- 'train': model.eval(fold['train'], fold['j_train_groups']),
- 'test': model.eval(fold['test'], fold['j_test_groups']),
+ "train": model.summary().train(),
+ "test": model.summary().test(),
}
job_w_retry = _py4j_retry(job, {
- 'train': float('nan'),
- 'test': float('nan'),
+ "train": [float('nan')],
+ "test": [float('nan')],
})
if pool is None:
return map(job_w_retry, folds)
else:
return pool.map(job_w_retry, folds)
-
-
-def cross_validate(df, train_func, params, num_folds=5, num_workers=5,
pool=None):
- """Perform cross-validation of the dataframe
-
- Parameters
- ----------
- df : pyspark.sql.DataFrame or list
- train_func : callable
- Function used to train a model. Must return a model that
- implements an eval method
- params : dict
- parameters to pass on to train_func
- num_folds : int
- Number of folds to split df into for cross validation
- num_workers : int
- Number of executors to use for each model training
- pool : multiprocessing.dummy.Pool, optional
- Used to prepare folds and run cross validations in parallel. If
- not provided all work will be done sequentially.
-
- Returns
- -------
- list
- List of dicts, each dict containing a train and test key. The values
- correspond the the model evaluation metric for the train and test
- data frames.
- """
- folds = _make_folds(df, num_folds, num_workers, pool)
- try:
- return _cross_validate(folds, train_func, params,
num_workers=num_workers, pool=pool)
- finally:
- for fold in folds:
- fold['train'].unpersist()
- fold['test'].unpersist()
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index 2e30a36..984c0e6 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -1,29 +1,22 @@
from __future__ import absolute_import
+import functools
import hyperopt
import math
import mjolnir.spark
import mjolnir.training.hyperopt
from multiprocessing.dummy import Pool
import numpy as np
+import pyspark
import pyspark.sql
from pyspark.sql import functions as F
import tempfile
-# Example Command line:
-# PYSPARK_PYTHON=venv/bin/python SPARK_CONF_DIR=/etc/spark/conf
~/spark-2.1.0-bin-hadoop2.6/bin/pyspark \
-# --master yarn \
-# --jars ~/mjolnir_2.11-1.0.jar \
-# --driver-class-path ~/mjolnir_2.11-1.0.jar \
-# --archives 'mjolnir_venv.zip#venv' \
-# --files /usr/lib/libhdfs.so.0.0.0 \
-# --executor-cores 4 \
-# --executor-memory 4G \
-# --conf spark.dynamicAllocation.maxExecutors=40 \
-# --conf spark.task.cpus=4
-
def prep_training(df, num_partitions=None):
"""Prepare a dataframe for training
+
+ This is no longer used for training. It can stlil be used to run
predictions
+ or evaluations on a model. Training uses make_fold utility now.
Ranking models in XGBoost require rows for the same query to be provided
consequtively within a single partition. It additionally requires a
@@ -106,70 +99,62 @@
return retval
-def train(df, params, num_workers=None):
+def train(fold, params, train_matrix=None):
"""Train a single xgboost ranking model.
- df : pyspark.sql.DataFrame
- Training data
+ fold: dict
+ map from split names to list of data partitions
params : dict
parameters to pass on to xgboost
- num_workers : int, optional
- The number of executors to train with. If not provided then
- 'groupData' *must* be present in params and num_workers will
- be set to the number of partitions in df.
Returns
-------
XGBoostModel
Trained xgboost model
"""
-
# hyperparameter tuning may have given us floats where we need
# ints, so this gets all the types right for Java. Also makes
# a copy of params so we don't modifying the incoming dict.
params = _coerce_params(params)
# TODO: Maybe num_rounds should just be external? But it's easier
# to do hyperparameter optimization with a consistent dict interface
- num_rounds = params['num_rounds']
- del params['num_rounds']
+ kwargs = {
+ 'num_rounds': 100,
+ 'early_stopping_round': 0,
+ }
+ if 'num_rounds' in params:
+ kwargs['num_rounds'] = params['num_rounds']
+ del params['num_rounds']
+ if 'early_stopping_round' in params:
+ kwargs['early_stopping_round'] = params['early_stopping_round']
+ del params['early_stopping_round']
+
# Set some sane defaults for ranking tasks
if 'objective' not in params:
params['objective'] = 'rank:ndcg'
if 'eval_metric' not in params:
params['eval_metric'] = 'ndcg@10'
- unpersist = False
- if num_workers is None:
- num_workers = df.rdd.getNumPartitions()
- if 'groupData' in params:
- assert params['groupData'].length() == num_workers
- df_grouped = df
+ # Convenience for some situations, but typically be explicit
+ # about the name of the matrix to train against.
+ if train_matrix is None:
+ train_matrix = "all" if "all" in fold else "train"
+
+ return XGBoostModel.trainWithFiles(fold, train_matrix, params, **kwargs)
+
+
+class XGBoostSummary(object):
+ def __init__(self, j_xgb_summary):
+ self._j_xgb_summary = j_xgb_summary
+
+ def train(self):
+ return list(self._j_xgb_summary.trainObjectiveHistory())
+
+ def test(self):
+ if self._j_xgb_summary.testObjectiveHistory().isEmpty():
+ return None
else:
- df_grouped, j_groups = prep_training(df, num_workers)
- unpersist = True
- params['groupData'] = j_groups
- elif 'groupData' in params:
- df_grouped = df
- else:
- df_grouped, j_groups = prep_training(df, num_workers)
- unpersist = True
- params['groupData'] = j_groups
-
- # We must have the same number of partitions here as workers the model will
- # be trained with, or xgboost4j-spark will repartition and the c++ library
- # will throw an exception. It's much cleaner to fail-fast here rather than
- # figuring out c++ errors through JNI from remote workers.
- assert df_grouped.rdd.getNumPartitions() == num_workers
- assert 'groupData' in params
- assert params['groupData'].length() == num_workers
-
- try:
- return XGBoostModel.trainWithDataFrame(df_grouped, params, num_rounds,
- num_workers,
feature_col='features',
- label_col='label')
- finally:
- if unpersist:
- df_grouped.unpersist()
+ return list(self._j_xgb_summary.testObjectiveHistory().get())
class XGBoostModel(object):
@@ -177,9 +162,8 @@
self._j_xgb_model = j_xgb_model
@staticmethod
- def trainWithDataFrame(trainingData, params, num_rounds, num_workers,
objective=None,
- eval_metric=None, missing=float('nan'),
- feature_col='features', label_col='label'):
+ def trainWithFiles(fold, train_matrix, params, num_rounds=100,
+ early_stopping_round=0):
"""Wrapper around scala XGBoostModel.trainWithRDD
This intentionally forwards to trainWithRDD, rather than
@@ -188,39 +172,34 @@
Parameters
----------
- trainingData : pyspark.sql.DataFrame
+ fold: dict
+ map from string name to list of data files for the split
+ train_matrix: str
+ name of split in fold to train against
params : dict
+ XGBoost training parameters
num_rounds : int
- num_workers : int
- objective : py4j.java_gateway.JavaObject, optional
- Allows providing custom objective implementation. (Default: None)
- eval_metric : py4j.java_gateway.JavaObject, optional
- Allows providing a custom evaluation metric implementation.
- (Default: None)
- missing : float, optional
- The value representing the missing value in the dataset. features
with
- this value will be removed and the vectors treated as sparse.
(Default: nan)
- feature_col : string, optional
- The dataframe column holding feature vectors. (Default: features)
- label_col : string, optional
- The dataframe column holding labels. (Default: label)
+ Maximum number of boosting rounds to perform
+ early_stopping_round : int, optional
+ Quit training after this many rounds with no improvement in
+ test set eval. 0 disables behaviour. (Default: 0)
Returns
-------
mjolnir.training.xgboost.XGBoostModel
trained xgboost ranking model
"""
- sc = trainingData._sc
+ sc = pyspark.SparkContext.getOrCreate()
+ # Type is Seq[Map[String, String]]
+ j_fold = sc._jvm.PythonUtils.toSeq([sc._jvm.PythonUtils.toScalaMap(x)
for x in fold])
+ # Type is Map[String, Any]
j_params = sc._jvm.scala.collection.immutable.HashMap()
for k, v in params.items():
j_params = j_params.updated(k, v)
- j_rdd =
sc._jvm.org.wikimedia.search.mjolnir.PythonUtils.toLabeledPoints(
- trainingData._jdf, feature_col, label_col)
-
- j_xgb_model =
sc._jvm.ml.dmlc.xgboost4j.scala.spark.XGBoost.trainWithRDD(
- j_rdd, j_params, num_rounds, num_workers,
- objective, eval_metric, False, missing)
+ j_xgb_model =
sc._jvm.org.wikimedia.search.mjolnir.MlrXGBoost.trainWithFiles(
+ sc._jsc, j_fold, train_matrix, j_params, num_rounds,
+ early_stopping_round)
return XGBoostModel(j_xgb_model)
def transform(self, df_test):
@@ -312,6 +291,9 @@
score = self._j_xgb_model.eval(j_rdd, 'test', None, 0, False, j_groups)
return float(score.split('=')[1].strip())
+ def summary(self):
+ return XGBoostSummary(self._j_xgb_model.summary())
+
def saveModelAsHadoopFile(self, sc, path):
j_sc =
sc._jvm.org.apache.spark.api.java.JavaSparkContext.toSparkContext(sc._jsc)
self._j_xgb_model.saveModelAsHadoopFile(path, j_sc)
@@ -333,7 +315,7 @@
return XGBoostModel(j_xgb_model)
-def tune(df, num_folds=5, num_cv_jobs=5, num_workers=5, initial_num_trees=100,
final_num_trees=500):
+def tune(folds, stats, train_matrix, num_cv_jobs=5, initial_num_trees=100,
final_num_trees=500):
"""Find appropriate hyperparameters for training df
This is far from perfect, hyperparameter tuning is a bit of a black art
@@ -352,16 +334,11 @@
Parameters
----------
- df : pyspark.sql.DataFrame
- num_folds : int, optional
- The number of cross validation folds to use while tuning. (Default: 5)
+ folds : list of dict containing train and test keys
+ stats : dict
+ stats about the fold from the make_folds utility script
num_cv_jobs : int, optional
The number of cross validation folds to train in parallel. (Default: 5)
- num_workers : int, optional
- The number of spark executors to use per fold for training. The total
- number of executors used will be (num_cv_jobs * num_workers). Generally
- prefer executors with more cpu's over a higher number of workers where
- possible. (Default: 5)
initial_num_trees: int, optional
The number of trees to do most of the hyperparameter tuning with. This
should be large enough to be resonably representative of the final
@@ -379,26 +356,33 @@
performed, each containing a hyperopt.Trials object recording what
happened.
"""
+ cv_pool = None
+ if num_cv_jobs > 1:
+ cv_pool = Pool(num_cv_jobs)
- cv_pool = Pool(num_cv_jobs)
- trials_pool_size = int(math.floor(num_cv_jobs / num_folds))
+ # Configure the trials pool large enough to keep cv_pool full
+ num_folds = len(folds)
+ num_workers = len(folds[0])
+ trials_pool_size = int(math.floor(num_cv_jobs / (num_folds * num_workers)))
if trials_pool_size > 1:
print 'Running %d cross validations in parallel' % (trials_pool_size)
trials_pool = Pool(trials_pool_size)
else:
trials_pool = None
+ train_func = functools.partial(train, train_matrix=train_matrix)
+
def eval_space(space, max_evals):
"""Eval a space using standard hyperopt"""
best, trials = mjolnir.training.hyperopt.minimize(
- df, train, space, max_evals=max_evals, num_folds=num_folds,
- num_workers=num_workers, cv_pool=cv_pool, trials_pool=trials_pool)
+ folds, train_func, space, max_evals=max_evals,
+ cv_pool=cv_pool, trials_pool=trials_pool)
for k, v in space.items():
if not np.isscalar(v):
print 'best %s: %f' % (k, best[k])
return best, trials
- num_obs = df.count()
+ num_obs = stats['num_observations']
if num_obs > 8000000:
dataset_size = 'xlarge'
diff --git a/mjolnir/utilities/make_folds.py b/mjolnir/utilities/make_folds.py
new file mode 100644
index 0000000..c7ac04d
--- /dev/null
+++ b/mjolnir/utilities/make_folds.py
@@ -0,0 +1,205 @@
+"""
+Using the outputs of data_pipeline.py split our training data
+into multiple folds with a test/train split for each. Save these
+splits, to hdfs if requested, in the appropriate training algorithm
+binary format.
+"""
+from __future__ import absolute_import
+import argparse
+import collections
+import json
+import multiprocessing.dummy
+import mjolnir.feature_engineering
+import mjolnir.training.tuning
+from mjolnir.utils import as_local_paths, as_output_file, as_output_files, \
+ hdfs_mkdir, hdfs_unlink
+import os
+from pyspark import SparkContext
+from pyspark.sql import functions as F, HiveContext
+import xgboost
+
+
+def summarize_training_df(df, data_size):
+ if data_size > 10000000:
+ df = df.repartition(200)
+ summary = collections.defaultdict(dict)
+ for row in
mjolnir.feature_engineering.explode_features(df).describe().collect():
+ statistic = row.summary
+ for field, value in (x for x in row.asDict().items() if x[0] !=
'summary'):
+ summary[field][statistic] = value
+ return dict(summary)
+
+
+def make_df_stats(df, data_size):
+ return {
+ 'num_observations': data_size,
+ 'num_queries': df.select('query').drop_duplicates().count(),
+ 'num_norm_queries':
df.select('norm_query_id').drop_duplicates().count(),
+ 'features': df.schema['features'].metadata['features'],
+ 'summary': summarize_training_df(df, data_size),
+ }
+
+
+def write_xgb(in_path, out_path):
+ # Because xgboost hates us, reading a group file from disk
+ # is only supported in cli. From the c_api we have to provide it.
+ with open(in_path + '.query') as query:
+ query_boundaries = [int(line) for line in query]
+ # We cannot access the jvm from executors, only the driver,
+ # which means jvm xgboost is not available. For this one limited
+ # use case we must also have the python xgboost package.
+ dmat = xgboost.DMatrix(in_path)
+ dmat.set_group(query_boundaries)
+ dmat.save_binary(out_path)
+
+
+def write_wiki_folds(sc, df, num_workers, fold_col, path_format, features):
+ def write_binaries(rows):
+ # row is dict from split name (train/test) to path data can be found
+ for pair in rows:
+ i, row = pair
+ with as_local_paths(row.values(), with_query=True) as
local_inputs, \
+ as_output_files([path + '.xgb' for path in row.values()])
as local_outputs:
+ for local_input, local_output in zip(local_inputs,
local_outputs):
+ write_xgb(local_input, local_output.name)
+
+ # Write out as text files from scala, much faster than shuffling to python
+ writer = sc._jvm.org.wikimedia.search.mjolnir.DataWriter(sc._jsc)
+ j_paths = writer.write(df._jdf, num_workers, path_format, fold_col)
+
+ # Convert everything to python objects
+ # in scala this is Array[Array[Map[String, String]]]
+ all_paths = []
+ for j_fold in j_paths:
+ fold = []
+ all_paths.append(fold)
+ for j_partition in j_fold:
+ partition = {str(k): str(v) for k, v in dict(j_partition).items()}
+ fold.append(partition)
+
+ # Enumerate gives a partition id used by partitionBy below
+ # This isn't a partition id of the data, but the stage we are making.
+ all_splits = list(enumerate(partition for fold in all_paths for partition
in fold))
+ # For all the emitted folds create binary data files
+ sc.parallelize(all_splits, 1).partitionBy(len(all_splits), lambda x:
x).foreachPartition(write_binaries)
+ # Cleanup the text/query output, keeping only the binary data files
+ hdfs_paths = []
+ for i, partition in all_splits:
+ for path in partition.values():
+ for extension in ['', '.query']:
+ hdfs_paths.append(path + extension)
+ hdfs_unlink(*hdfs_paths)
+ return all_paths
+
+
+def write_wiki_all(*args):
+ return write_wiki_folds(*args)[0]
+
+
+def make_folds(sc, sqlContext, input_dir, output_dir, wikis, zero_features,
num_folds, num_workers, max_executors):
+ hdfs_mkdir(output_dir)
+ df = sqlContext.read.parquet(input_dir) \
+ .select('wikiid', 'query', 'features', 'label', 'norm_query_id')
+ if wikis:
+ df = df.where(F.col('wikiid').isin(wikis))
+
+ counts =
df.groupBy('wikiid').agg(F.count(F.lit(1)).alias('n_obs')).collect()
+ counts = {row.wikiid: row.n_obs for row in counts}
+
+ if not wikis:
+ wikis = counts.keys()
+ else:
+ missing = set(wikis).difference(counts.keys())
+ for wiki in missing:
+ print 'No observations available for ' + wiki
+ wikis = list(set(wikis).intersection(counts.keys()))
+ if not wikis:
+ raise Exception('No wikis provided')
+
+ # sort to descending size, so mapping over them does the largest first
+ wikis.sort(reverse=True, key=lambda wiki: counts[wiki])
+
+ if zero_features:
+ df = mjolnir.feature_engineering.zero_features(df, zero_features)
+ if max_executors is None:
+ max_executors = num_workers
+
+ # TODO: Limit size?
+ pool = multiprocessing.dummy.Pool(len(wikis) * 3)
+ features = df.schema['features'].metadata['features']
+
+ df_fold = (
+ mjolnir.training.tuning.group_k_fold(df, num_folds)
+ .repartition(200, 'wikiid', 'query')
+ .sortWithinPartitions('wikiid', 'query'))
+
+ try:
+ df_fold.cache()
+ df_fold.count()
+
+ wiki_stats = {}
+ for wiki in wikis:
+ df_wiki = df_fold.where(F.col('wikiid') == wiki).drop('wikiid')
+ path_format = os.path.join(output_dir, wiki + '.%s.f%s.p%d')
+ wiki_stats[wiki] = {
+ 'all': pool.apply_async(write_wiki_all, (sc, df_wiki,
num_workers, None, path_format, features)),
+ 'folds': pool.apply_async(write_wiki_folds, (sc, df_wiki,
num_workers, 'fold', path_format, features)),
+ 'stats': pool.apply_async(make_df_stats, (df_wiki,
counts[wiki])),
+ }
+
+ wiki_stats = {wiki: {k: v.get() for k, v in stats.items()} for wiki,
stats in wiki_stats.items()}
+ for wiki in wikis:
+ wiki_stats[wiki]['num_folds'] = num_folds
+ wiki_stats[wiki]['num_workers'] = num_workers
+ finally:
+ df_fold.unpersist()
+
+ with as_output_file(os.path.join(output_dir, 'stats.json')) as f:
+ f.write(json.dumps({
+ 'input_dir': input_dir,
+ 'wikis': wiki_stats,
+ }))
+
+
+def parse_arguments(argv):
+ parser = argparse.ArgumentParser(description='Prepare XGB binary matrices')
+ parser.add_argument(
+ '-i', '--input', dest='input_dir', type=str, required=True,
+ help='Input path, prefixed with hdfs://, to dataframe with labels and
features')
+ parser.add_argument(
+ '-o', '--output-dir', dest='output_dir', type=str, required=True,
+ help='Output path, prefixed with hdfs://, to store binary matrices')
+ parser.add_argument(
+ '-f', '--num-folds', dest='num_folds', type=int, default=5,
+ help='The number of folds to split the data into')
+ parser.add_argument(
+ '-x', '--max-executors', dest='max_executors', type=int, default=None,
+ help='The maximum number of executors to use to write out folds')
+ parser.add_argument(
+ '-w', '--num-workers', dest='num_workers', type=int, default=1,
+ help='The number of workers used to train a single model')
+ parser.add_argument(
+ '-z', '--zero-features', dest='zero_features', type=str,
required=False, default=None,
+ help='TODO')
+ parser.add_argument(
+ 'wikis', metavar='wiki', type=str, nargs='*',
+ help='List of wikis to build matrices for')
+ args = parser.parse_args(argv)
+ return dict(vars(args))
+
+
+def main(argv=None):
+ args = parse_arguments(argv)
+
+ app_name = 'MLR: writer binary folded datasets'
+ if args['wikis']:
+ app_name += ': ' + ', '.join(args['wikis'])
+ sc = SparkContext(appName=app_name)
+ sc.setLogLevel('WARN')
+ sqlContext = HiveContext(sc)
+
+ make_folds(sc, sqlContext, **args)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/mjolnir/utilities/spark.py b/mjolnir/utilities/spark.py
index b911295..d49555d 100644
--- a/mjolnir/utilities/spark.py
+++ b/mjolnir/utilities/spark.py
@@ -226,6 +226,16 @@
'commands': {
'pyspark': ['spark_command'],
'data_pipeline': ['spark_command', 'mjolnir_utility_path',
'mjolnir_utility'],
+ 'make_folds': {
+ 'spark_command': [],
+ 'mjolnir_utility_path': [],
+ 'mjolnir_utility': [],
+ 'spark_conf': [
+ 'spark.yarn.executor.memoryOverhead',
+ ],
+ 'spark_args': [],
+ 'cmd_args': ['num-workers', 'num-folds']
+ },
'training_pipeline': {
# Using an empty array requires the key exists, even if it
doesn't
# contain sub-properties.
@@ -238,7 +248,7 @@
'spark.task.cpus'
],
'spark_args': ['executor-memory', 'executor-cores'],
- 'cmd_args': ['workers', 'cv-jobs', 'folds', 'final-trees']
+ 'cmd_args': ['cv-jobs', 'final-trees']
}
}
})
@@ -406,6 +416,13 @@
subprocess_check_call(cmd, env=config['environment'])
+def make_folds(global_profile, profiles):
+ for name, profile in profiles.items():
+ config = profile['commands']['make_folds']
+ cmd = build_spark_command(config) + build_mjolnir_utility(config) +
profile['wikis']
+ subprocess_check_call(cmd, env=config['environment'])
+
+
def train(global_profile, profiles):
"""Run mjolnir training pipeline"""
for name, profile in profiles.items():
@@ -497,9 +514,13 @@
'func': train,
'needed': ['training_pipeline'],
},
+ 'make_folds': {
+ 'func': make_folds,
+ 'needed': ['make_folds'],
+ },
'collect_and_train': {
'func': collect_and_train,
- 'needed': ['data_pipeline', 'training_pipeline'],
+ 'needed': ['data_pipeline', 'make_folds', 'training_pipeline'],
},
'shell': {
'func': lambda x, y: shell('pyspark', x, y),
diff --git a/mjolnir/utilities/training_pipeline.py
b/mjolnir/utilities/training_pipeline.py
index 6918768..99a72c8 100644
--- a/mjolnir/utilities/training_pipeline.py
+++ b/mjolnir/utilities/training_pipeline.py
@@ -8,69 +8,65 @@
--artifacts 'mjolnir_venv.zip#venv' \
path/to/training_pipeline.py
"""
-
from __future__ import absolute_import
import argparse
-import collections
import datetime
import glob
+import json
import logging
import mjolnir.feature_engineering
import mjolnir.training.xgboost
+from mjolnir.utils import hdfs_open_read
import os
import pickle
-import sys
from pyspark import SparkContext
from pyspark.sql import HiveContext
-from pyspark.sql import functions as F
+import sys
-def summarize_training_df(df, data_size):
- """Generate a summary of min/max/stddev/mean of data frame
+def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis,
initial_num_trees, final_num_trees, num_cv_jobs):
+ with hdfs_open_read(os.path.join(input_dir, 'stats.json')) as f:
+ stats = json.loads(f.read())
- Parameters
- ----------
- df : pyspark.sql.DataFrame
- DataFrame used for training
- data_size : int
- Number of rows in df
+ wikis_available = set(stats['wikis'].keys())
+ if wikis:
+ missing = set(wikis).difference(wikis_available)
+ if missing:
+ raise Exception("Wikis not available: " + ", ".join(missing))
+ wikis = wikis_available.intersection(wikis)
+ else:
+ wikis = stats['wikis'].keys()
+ if not wikis:
+ raise Exception("No wikis provided")
- Returns
- -------
- dict
- Map from field name to a map of statistics about the field
- """
- if data_size > 10000000:
- df = df.repartition(200)
- summary = collections.defaultdict(dict)
- for row in
mjolnir.feature_engineering.explode_features(df).describe().collect():
- statistic = row.summary
- for field, value in (x for x in row.asDict().items() if x[0] !=
'summary'):
- summary[field][statistic] = value
- return dict(summary)
-
-
-def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis,
initial_num_trees, final_num_trees,
- num_workers, num_cv_jobs, num_folds, zero_features):
for wiki in wikis:
+ config = stats['wikis'][wiki]
+
print 'Training wiki: %s' % (wiki)
- df_hits_with_features = (
- sqlContext.read.parquet(input_dir)
- .where(F.col('wikiid') == wiki))
+ num_folds = config['num_folds']
+ if num_cv_jobs is None:
+ num_cv_jobs = num_folds
- data_size = df_hits_with_features.count()
- if data_size == 0:
- print 'No data found.' % (wiki)
- print ''
- continue
+ # Add extension matching training type
+ extension = ".xgb"
- if zero_features:
- df_hits_with_features = mjolnir.feature_engineering.zero_features(
- df_hits_with_features, zero_features)
+ # Add file extensions to all the folds
+ folds = config['folds']
+ for fold in folds:
+ for partition in fold:
+ for name, path in partition.items():
+ partition[name] = path + extension
+
+ # "all" data with no splits
+ all_paths = config['all']
+ for partition in all_paths:
+ for name, path in partition.items():
+ partition[name] = path + extension
tune_results = mjolnir.training.xgboost.tune(
- df_hits_with_features, num_folds=num_folds,
- num_cv_jobs=num_cv_jobs, num_workers=num_workers,
+ folds, config['stats'],
+ num_cv_jobs=num_cv_jobs,
+ train_matrix="train",
initial_num_trees=initial_num_trees,
final_num_trees=final_num_trees)
@@ -81,29 +77,21 @@
'wiki': wiki,
'input_dir': input_dir,
'training_datetime': datetime.datetime.now().isoformat(),
- 'num_observations': data_size,
- 'num_queries':
df_hits_with_features.select('query').drop_duplicates().count(),
- 'num_norm_queries':
df_hits_with_features.select('norm_query_id').drop_duplicates().count(),
- 'features':
df_hits_with_features.schema['features'].metadata['features'],
- 'summary': summarize_training_df(df_hits_with_features, data_size)
+ 'dataset': config['stats'],
}
- # Train a model over all data with best params. Use a copy
- # so j_groups doesn't end up inside tune_results and prevent
- # pickle from serializing it.
+ # Train a model over all data with best params.
best_params = tune_results['params'].copy()
print 'Best parameters:'
for param, value in best_params.items():
print '\t%20s: %s' % (param, value)
- df_grouped, j_groups = mjolnir.training.xgboost.prep_training(
- df_hits_with_features, num_workers)
- best_params['groupData'] = j_groups
model = mjolnir.training.xgboost.train(
- df_grouped, best_params)
+ all_paths, best_params, train_matrix="all")
- tune_results['metrics']['train'] = model.eval(df_grouped, j_groups)
- df_grouped.unpersist()
- print 'train-ndcg@10: %.5f' % (tune_results['metrics']['train'])
+ tune_results['metrics'] = {
+ 'train': model.summary().train()
+ }
+ print 'train-ndcg@10: %.5f' % (tune_results['metrics']['train'][-1])
# Save the tune results somewhere for later analysis. Use pickle
# to maintain the hyperopt.Trials objects as is. It might be nice
@@ -120,26 +108,17 @@
# Generate a feature map so xgboost can include feature names in the
dump.
# The final `q` indicates all features are quantitative values
(floats).
- features =
df_hits_with_features.schema['features'].metadata['features']
+ features = config['stats']['features']
json_model_output = os.path.join(output_dir, 'model_%s.json' % (wiki))
with open(json_model_output, 'wb') as f:
f.write(model.dump(features))
print 'Wrote xgboost json model to %s' % (json_model_output)
# Write out the xgboost binary format as well, so it can be re-loaded
# and evaluated
- xgb_model_output = os.path.join(output_dir, 'model_%s.xgb' % (wiki))
- model.saveModelAsLocalFile(xgb_model_output)
- print 'Wrote xgboost binary model to %s' % (xgb_model_output)
+ model_output = os.path.join(output_dir, 'model_%s.xgb' % (wiki))
+ model.saveModelAsLocalFile(model_output)
+ print 'Wrote xgboost binary model to %s' % (model_output)
print ''
-
-
-def str_to_bool(value):
- if value.lower() in ['true', 'yes', '1']:
- return True
- elif value.lower() in ['false', 'no', '0']:
- return False
- else:
- raise ValueError("Unknown boolean string: " + value)
def parse_arguments(argv):
@@ -152,17 +131,10 @@
help='Path, on local filesystem, to directory to store the results of '
'model training to.')
parser.add_argument(
- '-w', '--workers', dest='num_workers', default=10, type=int,
- help='Number of workers to train each individual model with. The total
number '
- + 'of executors required is workers * cv-jobs. (Default: 10)')
- parser.add_argument(
'-c', '--cv-jobs', dest='num_cv_jobs', default=None, type=int,
help='Number of cross validation folds to perform in parallel.
Defaults to number '
+ 'of folds, to run all in parallel. If this is a multiple of the
number '
+ 'of folds multiple cross validations will run in parallel.')
- parser.add_argument(
- '-f', '--folds', dest='num_folds', default=5, type=int,
- help='Number of cross validation folds to use. (Default: 5)')
parser.add_argument(
'--initial-trees', dest='initial_num_trees', default=100, type=int,
help='Number of trees to perform hyperparamter tuning with. (Default:
100)')
@@ -171,21 +143,16 @@
help='Number of trees in the final ensemble. If not provided the value
from '
+ '--initial-trees will be used. (Default: None)')
parser.add_argument(
- '-z', '--zero-feature', dest='zero_features', type=str, nargs='+',
- help='Zero out feature in input')
- parser.add_argument(
'-v', '--verbose', dest='verbose', default=False, action='store_true',
help='Increase logging to INFO')
parser.add_argument(
'-vv', '--very-verbose', dest='very_verbose', default=False,
action='store_true',
help='Increase logging to DEBUG')
parser.add_argument(
- 'wikis', metavar='wiki', type=str, nargs='+',
+ 'wikis', metavar='wiki', type=str, nargs='*',
help='A wiki to perform model training for.')
args = parser.parse_args(argv)
- if args.num_cv_jobs is None:
- args.num_cv_jobs = args.num_folds
return dict(vars(args))
@@ -202,7 +169,8 @@
# TODO: Set spark configuration? Some can't actually be set here though,
so best might be to set all of it
# on the command line for consistency.
app_name = "MLR: training pipeline xgboost"
- app_name += ': ' + ', '.join(args['wikis'])
+ if args['wikis']:
+ app_name += ': ' + ', '.join(args['wikis'])
sc = SparkContext(appName=app_name)
sc.setLogLevel('WARN')
sqlContext = HiveContext(sc)
diff --git a/mjolnir/utils.py b/mjolnir/utils.py
new file mode 100644
index 0000000..7ac6422
--- /dev/null
+++ b/mjolnir/utils.py
@@ -0,0 +1,112 @@
+from __future__ import absolute_import
+import contextlib
+import os
+import subprocess
+import tempfile
+import urlparse
+
+
+def multi_with(f):
+ @contextlib.contextmanager
+ def manager(inputs, **kwargs):
+ def make_child(data):
+ with f(data, **kwargs) as inner:
+ yield inner
+
+ # children list keeps the children alive until the end of the function.
+ # python will exit the with statements above when cleaning up this
list.
+ try:
+ children = []
+ output = []
+ for data in inputs:
+ child = make_child(data)
+ children.append(child)
+ output.append(child.send(None))
+ yield output
+ finally:
+ errors = []
+ for child in children:
+ try:
+ child.send(None)
+ except StopIteration:
+ pass
+ except Exception, e:
+ errors.append(e)
+ else:
+ errors.append(Exception("Expected StopIteration"))
+ if errors:
+ raise errors[0]
+
+ return manager
+
+
[email protected]
+def as_output_file(path):
+ if path[:7] == 'hdfs://':
+ f = tempfile.NamedTemporaryFile()
+ yield f
+ f.flush()
+ subprocess.check_call(['hdfs', 'dfs', '-copyFromLocal', f.name, path])
+ else:
+ if path[:len("file:/")] == "file:/":
+ path = path[len("file:"):]
+ with open(path, 'w') as f:
+ yield f
+
+
[email protected]
+def as_local_path(path, with_query=False):
+ if path[0] == '/':
+ yield path
+ elif path[:len("file:/")] == "file:/":
+ yield path[len("file:"):]
+ else:
+ # TODO: Untested
+ with tempfile.NamedTemporaryFile() as local:
+ subprocess.check_call(['hdfs', 'dfs', '-copyToLocal', path,
local.name])
+ if with_query:
+ try:
+ subprocess.check_call(['hdfs', 'dfs', '-copyToLocal', path
+ ".query", local.name + ".query"])
+ yield local.name
+ finally:
+ try:
+ os.unlink(local.name + ".query")
+ except OSError:
+ pass
+ else:
+ yield local.name
+
+
+as_local_paths = multi_with(as_local_path)
+as_output_files = multi_with(as_output_file)
+
+
+def hdfs_mkdir(path):
+ # Will error if it already exists
+ # TODO: Normalize error type?
+ if path[:7] == 'hdfs://':
+ subprocess.check_call(['hdfs', 'dfs', '-mkdir', path])
+ else:
+ os.mkdir(path)
+
+
+def hdfs_unlink(*paths):
+ remote = []
+ for path in paths:
+ if path[:7] == 'hdfs://':
+ remote.append(path)
+ else:
+ if path[:len("file:/")] == "file:/":
+ path = path[len("file:"):]
+ os.unlink(path)
+ if remote:
+ subprocess.check_call(['hdfs', 'dfs', '-rm'] + remote)
+
+
[email protected]
+def hdfs_open_read(path):
+ if path[:7] == 'hdfs://':
+ parts = urlparse.urlparse(path)
+ path = os.path.join('/mnt/hdfs', parts.path[1:])
+ with open(path, 'r') as f:
+ yield f
diff --git a/setup.py b/setup.py
index d2f22f1..9bd27e4 100644
--- a/setup.py
+++ b/setup.py
@@ -9,6 +9,9 @@
'kafka',
'pyyaml',
'hyperopt',
+ # python xgboost is only used for building
+ # binary datasets. Primary usage is from jvm.
+ 'xgboost',
# hyperopt requires networkx < 2.0, but doesn't say so
'networkx<2.0',
# pyspark requirements
--
To view, visit https://gerrit.wikimedia.org/r/403334
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ic8a1d4f405bcd1ad07dd53ef392f47d8dfa89246
Gerrit-PatchSet: 10
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>
Gerrit-Reviewer: EBernhardson <[email protected]>
Gerrit-Reviewer: Hashar <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits