Smalyshev has submitted this change and it was merged.
Change subject: Setup oozie workflow for transferToES.py
......................................................................
Setup oozie workflow for transferToES.py
Tested the pipeline and this appropriately kicks off the transferToES
script. Was tested with a few custom parameters pointing everything
into /user/ebernhardson, will need to retest with the production
values.
Change-Id: I45f93e22dc7d4a9cdf9e2f5fd26f2eb3c248f44e
---
M oozie/datasets.xml
A oozie/transfer_to_es/README.md
R oozie/transfer_to_es/all.dblist
A oozie/transfer_to_es/bundle.properties
A oozie/transfer_to_es/bundle.xml
A oozie/transfer_to_es/coordinator.xml
R oozie/transfer_to_es/hostmap.json
A oozie/transfer_to_es/lib/.gitkeep
R oozie/transfer_to_es/makeHostnames.py
R oozie/transfer_to_es/transferToES.py
R oozie/transfer_to_es/transferToFile.py
A oozie/transfer_to_es/workflow.xml
12 files changed, 389 insertions(+), 2 deletions(-)
Approvals:
Smalyshev: Looks good to me, approved
jenkins-bot: Verified
diff --git a/oozie/datasets.xml b/oozie/datasets.xml
index c90131d..68c7ec2 100644
--- a/oozie/datasets.xml
+++ b/oozie/datasets.xml
@@ -28,7 +28,7 @@
frequency="${coord:days(days_aggregated)}"
initial-instance="${start_time}"
timezone="Universal">
-
<uri-template>${discovery_data_directory}/popularity_score/agg_days=${days_aggregated}/year=${YEAR}/month=${"$"}{MONTH
+ 0}/day=${"$"}{DAY + 0}</uri-template>
+
<uri-template>${popularity_score_data_directory}/agg_days=${days_aggregated}/year=${YEAR}/month=${"$"}{MONTH
+ 0}/day=${"$"}{DAY + 0}</uri-template>
<done-flag>_SUCCESS</done-flag>
</dataset>
</datasets>
diff --git a/oozie/transfer_to_es/README.md b/oozie/transfer_to_es/README.md
new file mode 100644
index 0000000..347cf7f
--- /dev/null
+++ b/oozie/transfer_to_es/README.md
@@ -0,0 +1,18 @@
+# Transfer phase of discovery popularity score to elasticsearch clusters
+
+This job is reposible for transfering popularity score information from
+the hadoop cluster to the elasticsearch cluster.
+
+# Outline
+
+* ```bundle.properties``` is used to define parameters to the pipeline
+* ```bundle.xml``` Defines a separate import job for each active cluster
+* ```coordinator.xml``` Defines when and how to run the import job
+* ```workflow.xml```
+ * Runs a spark app to read in the scoring information from
+ discovery.popularity_score and write it to the specified elasticsearch
+ URL.
+
+Note that this job uses the popularity_score dataset. If the aggregation job
+does not have the _SUCCESS done-flag in the directory, the scores for that
+week will not be transfered until it does.
diff --git a/all.dblist b/oozie/transfer_to_es/all.dblist
similarity index 100%
rename from all.dblist
rename to oozie/transfer_to_es/all.dblist
diff --git a/oozie/transfer_to_es/bundle.properties
b/oozie/transfer_to_es/bundle.properties
new file mode 100644
index 0000000..8c9286e
--- /dev/null
+++ b/oozie/transfer_to_es/bundle.properties
@@ -0,0 +1,68 @@
+# Configures a coordinator to manage automatically exporting
+# popularity_score to elasticsearch.
+#
+# Any of the following properties are overidable with -D.
+# Usage:
+# oozie job -Duser=$USER -Dstart_time=2015-12-01T00:00Z -submit \
+# -config oozie/transfer_to_es/bundle.properties
+#
+# NOTE: Both *_oozie_directory must be synced to HDFS so that all relevant
+# .xml files exist there when this job is submitted.
+
+# Base path in HDFS to this repository oozie files.
+# Other files will be used relative to this path.
+discovery_oozie_directory = ${name_node}/wmf/discovery/current/oozie
+
+# Base path in HDFS to the analytics team oozie files.
+# Other files will be used relative to this path
+analytics_oozie_directory = ${name_node}/wmf/refinery/current/oozie
+
+name_node = hdfs://analytics-hadoop
+job_tracker = resourcemanager.analytics.eqiad.wmnet:8032
+queue_name = default
+
+user = hdfs
+
+# HDFS path to coordinator to run the transfer_to_es export.
+coordinator_file =
${discovery_oozie_directory}/transfer_to_es/coordinator.xml
+
+# HDFS path to workflow to run the transfer_to_es export.
+workflow_file =
${discovery_oozie_directory}/transfer_to_es/workflow.xml
+
+# HDFS path to popularity score dataset definitions
+discovery_datasets_file = ${discovery_oozie_directory}/datasets.xml
+discovery_data_directory = ${name_node}/wmf/data/wmf/discovery
+popularity_score_data_directory =
${discovery_data_directory}/popularity_score
+popularity_score_table = discovery.popularity_score
+
+# Number of days page views are aggregated over.
+days_aggregated = 7
+
+# Number of updates to send in each batch to elasticsearch
+transfer_to_es_batch_size = 1000
+
+# Initial import time of the popularity score dataset. This is one week after
the page_id
+# field was added to pageview_hourly.
+start_time = 2015-12-08T00:00Z
+
+# Time to stop running this coordinator. Year 3000 == never!
+stop_time = 3000-01-01T00:00Z
+
+# Workflow to send an error email
+send_error_email_workflow_file =
${analytics_oozie_directory}/util/send_error_email/workflow.xml
+
+# Spark assembly jar path needs to be given to spark conf
+spark_assembly_jar =
${name_node}/user/spark/share/lib/spark-assembly.jar
+
+# Spark load settings - Better to set them here, configurable through oozie
+# when spark alocates this resource. This partially controls the amount
+# of concurrency between the hadoop cluster and the elasticsearch cluster
+# during export.
+spark_number_executors = 3
+spark_executor_memory = 1G
+spark_driver_memory = 1G
+
+# Coordintator to start.
+oozie.bundle.application.path =
${discovery_oozie_directory}/transfer_to_es/bundle.xml
+oozie.use.system.libpath = true
+oozie.action.external.stats.write = true
diff --git a/oozie/transfer_to_es/bundle.xml b/oozie/transfer_to_es/bundle.xml
new file mode 100644
index 0000000..c8223ec
--- /dev/null
+++ b/oozie/transfer_to_es/bundle.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<bundle-app xmlns="uri:oozie:bundle:0.2"
+ name="transfer_to_es-bundle">
+
+ <parameters>
+
+ <!-- Required properties -->
+ <property><name>queue_name</name></property>
+ <property><name>coordinator_file</name></property>
+ <property><name>name_node</name></property>
+ <property><name>job_tracker</name></property>
+
+ <property><name>start_time</name></property>
+ <property><name>stop_time</name></property>
+ </parameters>
+
+ <coordinator name="transfer_to_es-eqiad-coord">
+ <app-path>${coordinator_file}</app-path>
+ <configuration>
+ <property>
+ <name>elasticsearch_url</name>
+ <value>http://search.svc.eqiad.wmnet</value>
+ </property>
+ </configuration>
+ </coordinator>
+
+ <coordinator name="transfer_to_es-codfw-coord">
+ <app-path>${coordinator_file}</app-path>
+ <configuration>
+ <property>
+ <name>elasticsearch_url</name>
+ <value>http://search.svc.codfw.wmnet</value>
+ </property>
+ </configuration>
+ </coordinator>
+
+</bundle-app>
diff --git a/oozie/transfer_to_es/coordinator.xml
b/oozie/transfer_to_es/coordinator.xml
new file mode 100644
index 0000000..4a8edfe
--- /dev/null
+++ b/oozie/transfer_to_es/coordinator.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<coordinator-app xmlns="uri:oozie:coordinator:0.4"
+ name="transfer_to_es-coord"
+ frequency="${coord:days(days_aggregated)}"
+ start="${start_time}"
+ end="${stop_time}"
+ timezone="Universal">
+
+ <parameters>
+
+ <!-- Required properties. -->
+ <property><name>workflow_file</name></property>
+ <property><name>start_time</name></property>
+ <property><name>stop_time</name></property>
+
+ <property><name>discovery_data_directory</name></property>
+ <property><name>discovery_datasets_file</name></property>
+ <property><name>popularity_score_data_directory</name></property>
+ <property><name>popularity_score_table</name></property>
+
+ <property><name>days_aggregated</name></property>
+
+ <property><name>queue_name</name></property>
+ <property><name>name_node</name></property>
+ <property><name>job_tracker</name></property>
+
+ <property><name>spark_number_executors</name></property>
+ <property><name>spark_executor_memory</name></property>
+ <property><name>spark_driver_memory</name></property>
+
+ <property><name>send_error_email_workflow_file</name></property>
+
+ <property><name>discovery_oozie_directory</name></property>
+
+ <property><name>transfer_to_es_batch_size</name></property>
+ <property><name>elasticsearch_url</name></property>
+ </parameters>
+
+ <controls>
+ <!--
+ If a materialized job sits around for too long we could end up
+ doing the export during the busiest part of the day for the ES
+ servers. Rather than risking an overload of the servers timeout
+ the job after two hours.
+ -->
+ <timeout>120</timeout>
+
+ <!--
+ transfering these documents can load up elasticsearch, additionally
+ one instance running would overwrite the work of another instance
+ running at the same time. Limit concurrency to prevent this. The
+ timeout setting combined with once a week runs should prevent this
+ anyways, but this is set for safety sake.
+ -->
+ <concurrency>1</concurrency>
+
+ <!--
+ Since we expect only one incarnation per weekly dataset, the
+ default throttle of 12 is way to high, and there is not need
+ to keep that many materialized jobs around.
+ -->
+ <throttle>1</throttle>
+ </controls>
+
+ <datasets>
+ <!--
+ Include discovery datasets files.
+ $discovery_datasets_file will be used as the output events
+ -->
+ <include>${discovery_datasets_file}</include>
+ </datasets>
+
+ <input-events>
+ <data-in name="popularity_score_input" dataset="popularity_score">
+ <instance>${coord:current(0)}</instance>
+ </data-in>
+ </input-events>
+
+ <action>
+ <workflow>
+ <app-path>${workflow_file}</app-path>
+ <configuration>
+
+ <property>
+ <name>year</name>
+ <value>${coord:formatTime(coord:nominalTime(),
"y")}</value>
+ </property>
+ <property>
+ <name>month</name>
+ <value>${coord:formatTime(coord:nominalTime(),
"M")}</value>
+ </property>
+ <property>
+ <name>day</name>
+ <value>${coord:formatTime(coord:nominalTime(),
"d")}</value>
+ </property>
+
+ </configuration>
+ </workflow>
+ </action>
+</coordinator-app>
diff --git a/hostmap.json b/oozie/transfer_to_es/hostmap.json
similarity index 100%
rename from hostmap.json
rename to oozie/transfer_to_es/hostmap.json
diff --git a/oozie/transfer_to_es/lib/.gitkeep
b/oozie/transfer_to_es/lib/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oozie/transfer_to_es/lib/.gitkeep
diff --git a/makeHostnames.py b/oozie/transfer_to_es/makeHostnames.py
similarity index 100%
rename from makeHostnames.py
rename to oozie/transfer_to_es/makeHostnames.py
diff --git a/transferToES.py b/oozie/transfer_to_es/transferToES.py
similarity index 94%
rename from transferToES.py
rename to oozie/transfer_to_es/transferToES.py
index 89218e0..4a29bcb 100644
--- a/transferToES.py
+++ b/oozie/transfer_to_es/transferToES.py
@@ -6,6 +6,7 @@
import requests
import json
import logging
+import subprocess
oparser = OptionParser()
oparser.add_option("-s", "--source", dest="source", help="source for the
data", metavar="SOURCE")
@@ -19,7 +20,10 @@
ITEMS_PER_BATCH = int(options.batch)
SOURCE = options.source
TARGET = options.url
-hostMap = json.load(open(options.hostmap))
+if options.hostmap[0:24] == 'hdfs://analytics-hadoop/':
+ hostMap = json.loads(subprocess.check_output(["hdfs", "dfs", "-cat",
options.hostmap[23:]]))
+else:
+ hostMap = json.load(open(options.hostmap))
print "Transferring from %s to %s" % (SOURCE, TARGET)
diff --git a/transferToFile.py b/oozie/transfer_to_es/transferToFile.py
similarity index 100%
rename from transferToFile.py
rename to oozie/transfer_to_es/transferToFile.py
diff --git a/oozie/transfer_to_es/workflow.xml
b/oozie/transfer_to_es/workflow.xml
new file mode 100644
index 0000000..51679b5
--- /dev/null
+++ b/oozie/transfer_to_es/workflow.xml
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+
name="discovery-transfer_to_es-${popularity_score_table}-${year},${month},${day}->${elasticsearch_url}-wf">
+
+ <parameters>
+
+ <!-- Default values for inner oozie settings -->
+ <property>
+ <name>oozie_launcher_queue_name</name>
+ <value>${queue_name}</value>
+ </property>
+ <property>
+ <name>oozie_launcher_memory</name>
+ <value>256</value>
+ </property>
+
+ <!-- Required properties -->
+ <property><name>queue_name</name></property>
+ <property><name>name_node</name></property>
+ <property><name>job_tracker</name></property>
+
+ <property>
+ <name>spark_assembly_jar</name>
+ <description>HDFS path to spark-assembly.jar</description>
+ </property>
+ <property>
+ <name>spark_number_executors</name>
+ <description>Number of executors to run job with</description>
+ </property>
+ <property>
+ <name>spark_executor_memory</name>
+ <description>Amount of memory to reserve on executor
instances</description>
+ </property>
+ <property>
+ <name>spark_driver_memory</name>
+ <description>Amount of memory to reserve on driver
instance</description>
+ </property>
+
+ <property>
+ <name>popularity_score_table</name>
+ <description>Fully qualified name of the popularity_score table in
hive</description>
+ </property>
+ <property>
+ <name>popularity_score_data_directory</name>
+ <description>The popularity_score directory path in
HDFS</description>
+ </property>
+ <property>
+ <name>popularity_score_partition_directory</name>
+
<value>${popularity_score_data_directory}/agg_days=${days_aggregated}/year=${year}/month=${month}/day=${day}</value>
+ </property>
+
+ <property>
+ <name>days_aggregated</name>
+ <description>Number of days to aggregate page views
over</description>
+ </property>
+
+ <property>
+ <name>year</name>
+ <description>Exlusive four digit year portion of date to end
aggregation on</description>
+ </property>
+ <property>
+ <name>month</name>
+ <description>Exclusive unpadded month portion of date to end
aggregation on</description>
+ </property>
+ <property>
+ <name>day</name>
+ <description>Exclusive unpadded day portion of date to end
aggregation on</description>
+ </property>
+
+ <property>
+ <name>send_error_email_workflow_file</name>
+ <description>Workflow for sending an email</description>
+ </property>
+
+ <property>
+ <name>discovery_oozie_directory</name>
+ <description>The path to this repositories oozie directory in
HDFS</description>
+ </property>
+
+ <property>
+ <name>pyspark_transfer_to_es_script</name>
+
<value>${discovery_oozie_directory}/transfer_to_es/transferToES.py</value>
+ <description>Path to Pyspark script to run for generating the
popularity score in HDFS.</description>
+ </property>
+ <property>
+ <name>transfer_to_es_hostmap_file</name>
+
<value>${discovery_oozie_directory}/transfer_to_es/hostmap.json</value>
+ <description>Path to JSON hostmap for domain -> wiki conversion in
HDFS</description>
+ </property>
+ <property>
+ <name>transfer_to_es_batch_size</name>
+ <description>Items per batch to load into ES</description>
+ </property>
+ <property>
+ <name>elasticsearch_url</name>
+ <description>URL to send the data to with no trailing
/</description>
+ </property>
+
+ </parameters>
+
+ <start to="transfer"/>
+
+ <action name="transfer">
+ <spark xmlns="uri:oozie:spark-action:0.1">
+ <job-tracker>${job_tracker}</job-tracker>
+ <name-node>${name_node}</name-node>
+ <configuration>
+ <property>
+ <name>mapreduce.job.queuename</name>
+ <value>${queue_name}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.queue.name</name>
+ <value>${oozie_launcher_queue_name}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapreduce.map.memory.mb</name>
+ <value>${oozie_launcher_memory}</value>
+ </property>
+ </configuration>
+ <master>yarn-cluster</master>
+ <mode>cluster</mode>
+ <name>Discovery Transfer To Elasticsearch</name>
+ <jar>${pyspark_transfer_to_es_script}</jar>
+ <spark-opts>--conf spark.yarn.jar=${spark_assembly_jar}
--executor-memory ${spark_executor_memory} --driver-memory
${spark_driver_memory} --num-executors ${spark_number_executors} --queue
${queue_name} --conf spark.yarn.appMasterEnv.SPARK_HOME=/bogus</spark-opts>
+ <!-- arguments for pyspark script -->
+ <arg>--source</arg>
+ <arg>${popularity_score_partition_directory}</arg>
+ <arg>--url</arg>
+ <arg>${elasticsearch_url}</arg>
+ <arg>--batch</arg>
+ <arg>${transfer_to_es_batch_size}</arg>
+ <arg>--hostmap</arg>
+ <arg>${transfer_to_es_hostmap_file}</arg>
+ </spark>
+
+ <ok to="end"/>
+ <error to="kill"/>
+ </action>
+
+ <action name="send_error_email">
+ <sub-workflow>
+ <app-path>${send_error_email_workflow_file}</app-path>
+ <propagate-configuration/>
+ <configuration>
+ <property>
+ <name>parent_name</name>
+ <value>${wf:name()}</value>
+ </property>
+ </configuration>
+ </sub-workflow>
+ <ok to="kill"/>
+ <error to="kill"/>
+ </action>
+
+ <kill name="kill">
+ <message>Action failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+ </kill>
+ <end name="end"/>
+</workflow-app>
--
To view, visit https://gerrit.wikimedia.org/r/257436
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I45f93e22dc7d4a9cdf9e2f5fd26f2eb3c248f44e
Gerrit-PatchSet: 11
Gerrit-Project: wikimedia/discovery/analytics
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>
Gerrit-Reviewer: EBernhardson <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Smalyshev <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits