Smalyshev has submitted this change and it was merged.

Change subject: Setup oozie workflow for transferToES.py
......................................................................


Setup oozie workflow for transferToES.py

Tested the pipeline and this appropriately kicks off the transferToES
script. Was tested with a few custom parameters pointing everything
into /user/ebernhardson, will need to retest with the production
values.

Change-Id: I45f93e22dc7d4a9cdf9e2f5fd26f2eb3c248f44e
---
M oozie/datasets.xml
A oozie/transfer_to_es/README.md
R oozie/transfer_to_es/all.dblist
A oozie/transfer_to_es/bundle.properties
A oozie/transfer_to_es/bundle.xml
A oozie/transfer_to_es/coordinator.xml
R oozie/transfer_to_es/hostmap.json
A oozie/transfer_to_es/lib/.gitkeep
R oozie/transfer_to_es/makeHostnames.py
R oozie/transfer_to_es/transferToES.py
R oozie/transfer_to_es/transferToFile.py
A oozie/transfer_to_es/workflow.xml
12 files changed, 389 insertions(+), 2 deletions(-)

Approvals:
  Smalyshev: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/oozie/datasets.xml b/oozie/datasets.xml
index c90131d..68c7ec2 100644
--- a/oozie/datasets.xml
+++ b/oozie/datasets.xml
@@ -28,7 +28,7 @@
              frequency="${coord:days(days_aggregated)}"
              initial-instance="${start_time}"
              timezone="Universal">
-        
<uri-template>${discovery_data_directory}/popularity_score/agg_days=${days_aggregated}/year=${YEAR}/month=${"$"}{MONTH
 + 0}/day=${"$"}{DAY + 0}</uri-template>
+        
<uri-template>${popularity_score_data_directory}/agg_days=${days_aggregated}/year=${YEAR}/month=${"$"}{MONTH
 + 0}/day=${"$"}{DAY + 0}</uri-template>
         <done-flag>_SUCCESS</done-flag>
     </dataset>
 </datasets>
diff --git a/oozie/transfer_to_es/README.md b/oozie/transfer_to_es/README.md
new file mode 100644
index 0000000..347cf7f
--- /dev/null
+++ b/oozie/transfer_to_es/README.md
@@ -0,0 +1,18 @@
+# Transfer phase of discovery popularity score to elasticsearch clusters
+
+This job is reposible for transfering popularity score information from
+the hadoop cluster to the elasticsearch cluster.
+
+# Outline
+
+* ```bundle.properties``` is used to define parameters to the pipeline
+* ```bundle.xml``` Defines a separate import job for each active cluster
+* ```coordinator.xml``` Defines when and how to run the import job
+* ```workflow.xml```
+  * Runs a spark app to read in the scoring information from 
+    discovery.popularity_score and write it to the specified elasticsearch
+    URL.
+
+Note that this job uses the popularity_score dataset. If the aggregation job
+does not have the _SUCCESS done-flag in the directory, the scores for that
+week will not be transfered until it does.
diff --git a/all.dblist b/oozie/transfer_to_es/all.dblist
similarity index 100%
rename from all.dblist
rename to oozie/transfer_to_es/all.dblist
diff --git a/oozie/transfer_to_es/bundle.properties 
b/oozie/transfer_to_es/bundle.properties
new file mode 100644
index 0000000..8c9286e
--- /dev/null
+++ b/oozie/transfer_to_es/bundle.properties
@@ -0,0 +1,68 @@
+# Configures a coordinator to manage automatically exporting
+# popularity_score to elasticsearch.
+#
+# Any of the following properties are overidable with -D.
+# Usage:
+#   oozie job -Duser=$USER -Dstart_time=2015-12-01T00:00Z -submit \
+#       -config oozie/transfer_to_es/bundle.properties
+#
+# NOTE:  Both *_oozie_directory must be synced to HDFS so that all relevant
+#        .xml files exist there when this job is submitted.
+
+# Base path in HDFS to this repository oozie files.
+# Other files will be used relative to this path.
+discovery_oozie_directory         = ${name_node}/wmf/discovery/current/oozie
+
+# Base path in HDFS to the analytics team oozie files.
+# Other files will be used relative to this path
+analytics_oozie_directory         = ${name_node}/wmf/refinery/current/oozie
+
+name_node                         = hdfs://analytics-hadoop
+job_tracker                       = resourcemanager.analytics.eqiad.wmnet:8032
+queue_name                        = default
+
+user                              = hdfs
+
+# HDFS path to coordinator to run the transfer_to_es export.
+coordinator_file                  = 
${discovery_oozie_directory}/transfer_to_es/coordinator.xml
+
+# HDFS path to workflow to run the transfer_to_es export.
+workflow_file                     = 
${discovery_oozie_directory}/transfer_to_es/workflow.xml
+
+# HDFS path to popularity score dataset definitions
+discovery_datasets_file           = ${discovery_oozie_directory}/datasets.xml
+discovery_data_directory          = ${name_node}/wmf/data/wmf/discovery
+popularity_score_data_directory   = 
${discovery_data_directory}/popularity_score
+popularity_score_table            = discovery.popularity_score
+
+# Number of days page views are aggregated over.
+days_aggregated                   = 7
+
+# Number of updates to send in each batch to elasticsearch
+transfer_to_es_batch_size         = 1000
+
+# Initial import time of the popularity score dataset. This is one week after 
the page_id
+# field was added to pageview_hourly.
+start_time                        = 2015-12-08T00:00Z
+
+# Time to stop running this coordinator.  Year 3000 == never!
+stop_time                         = 3000-01-01T00:00Z
+
+# Workflow to send an error email
+send_error_email_workflow_file    = 
${analytics_oozie_directory}/util/send_error_email/workflow.xml
+
+# Spark assembly jar path needs to be given to spark conf
+spark_assembly_jar                = 
${name_node}/user/spark/share/lib/spark-assembly.jar
+
+# Spark load settings - Better to set them here, configurable through oozie
+# when spark alocates this resource. This partially controls the amount
+# of concurrency between the hadoop cluster and the elasticsearch cluster
+# during export.
+spark_number_executors            = 3
+spark_executor_memory             = 1G
+spark_driver_memory               = 1G
+
+# Coordintator to start.
+oozie.bundle.application.path     = 
${discovery_oozie_directory}/transfer_to_es/bundle.xml
+oozie.use.system.libpath          = true
+oozie.action.external.stats.write = true
diff --git a/oozie/transfer_to_es/bundle.xml b/oozie/transfer_to_es/bundle.xml
new file mode 100644
index 0000000..c8223ec
--- /dev/null
+++ b/oozie/transfer_to_es/bundle.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<bundle-app xmlns="uri:oozie:bundle:0.2"
+    name="transfer_to_es-bundle">
+
+    <parameters>
+
+        <!-- Required properties -->
+        <property><name>queue_name</name></property>
+        <property><name>coordinator_file</name></property>
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+
+        <property><name>start_time</name></property>
+        <property><name>stop_time</name></property>
+    </parameters>
+
+    <coordinator name="transfer_to_es-eqiad-coord">
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>elasticsearch_url</name>
+                <value>http://search.svc.eqiad.wmnet</value>
+            </property>
+        </configuration>
+    </coordinator>
+
+    <coordinator name="transfer_to_es-codfw-coord">
+        <app-path>${coordinator_file}</app-path>
+        <configuration>
+            <property>
+                <name>elasticsearch_url</name>
+                <value>http://search.svc.codfw.wmnet</value>
+            </property>
+        </configuration>
+    </coordinator>
+
+</bundle-app>
diff --git a/oozie/transfer_to_es/coordinator.xml 
b/oozie/transfer_to_es/coordinator.xml
new file mode 100644
index 0000000..4a8edfe
--- /dev/null
+++ b/oozie/transfer_to_es/coordinator.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<coordinator-app xmlns="uri:oozie:coordinator:0.4"
+    name="transfer_to_es-coord"
+    frequency="${coord:days(days_aggregated)}"
+    start="${start_time}"
+    end="${stop_time}"
+    timezone="Universal">
+
+    <parameters>
+
+        <!-- Required properties. -->
+        <property><name>workflow_file</name></property>
+        <property><name>start_time</name></property>
+        <property><name>stop_time</name></property>
+
+        <property><name>discovery_data_directory</name></property>
+        <property><name>discovery_datasets_file</name></property>
+        <property><name>popularity_score_data_directory</name></property>
+        <property><name>popularity_score_table</name></property>
+
+        <property><name>days_aggregated</name></property>
+
+        <property><name>queue_name</name></property>
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+
+        <property><name>spark_number_executors</name></property>
+        <property><name>spark_executor_memory</name></property>
+        <property><name>spark_driver_memory</name></property>
+
+        <property><name>send_error_email_workflow_file</name></property>
+
+        <property><name>discovery_oozie_directory</name></property>
+
+        <property><name>transfer_to_es_batch_size</name></property>
+        <property><name>elasticsearch_url</name></property>
+    </parameters>
+
+    <controls>
+        <!--
+        If a materialized job sits around for too long we could end up
+        doing the export during the busiest part of the day for the ES
+        servers. Rather than risking an overload of the servers timeout
+        the job after two hours.
+        -->
+        <timeout>120</timeout>
+
+        <!--
+        transfering these documents can load up elasticsearch, additionally
+        one instance running would overwrite the work of another instance
+        running at the same time. Limit concurrency to prevent this. The
+        timeout setting combined with once a week runs should prevent this
+        anyways, but this is set for safety sake.
+        -->
+        <concurrency>1</concurrency>
+
+        <!--
+        Since we expect only one incarnation per weekly dataset, the
+        default throttle of 12 is way to high, and there is not need
+        to keep that many materialized jobs around.
+        -->
+        <throttle>1</throttle>
+    </controls>
+
+    <datasets>
+        <!--
+        Include discovery datasets files.
+        $discovery_datasets_file will be used as the output events
+        -->
+        <include>${discovery_datasets_file}</include>
+    </datasets>
+
+    <input-events>
+        <data-in name="popularity_score_input" dataset="popularity_score">
+            <instance>${coord:current(0)}</instance>
+        </data-in>
+    </input-events>
+
+    <action>
+        <workflow>
+            <app-path>${workflow_file}</app-path>
+            <configuration>
+
+                <property>
+                    <name>year</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"y")}</value>
+                </property>
+                <property>
+                    <name>month</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"M")}</value>
+                </property>
+                <property>
+                    <name>day</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"d")}</value>
+                </property>
+
+            </configuration>
+        </workflow>
+    </action>
+</coordinator-app>
diff --git a/hostmap.json b/oozie/transfer_to_es/hostmap.json
similarity index 100%
rename from hostmap.json
rename to oozie/transfer_to_es/hostmap.json
diff --git a/oozie/transfer_to_es/lib/.gitkeep 
b/oozie/transfer_to_es/lib/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oozie/transfer_to_es/lib/.gitkeep
diff --git a/makeHostnames.py b/oozie/transfer_to_es/makeHostnames.py
similarity index 100%
rename from makeHostnames.py
rename to oozie/transfer_to_es/makeHostnames.py
diff --git a/transferToES.py b/oozie/transfer_to_es/transferToES.py
similarity index 94%
rename from transferToES.py
rename to oozie/transfer_to_es/transferToES.py
index 89218e0..4a29bcb 100644
--- a/transferToES.py
+++ b/oozie/transfer_to_es/transferToES.py
@@ -6,6 +6,7 @@
 import requests
 import json
 import logging
+import subprocess
 
 oparser = OptionParser()
 oparser.add_option("-s", "--source", dest="source", help="source for the 
data", metavar="SOURCE")
@@ -19,7 +20,10 @@
 ITEMS_PER_BATCH = int(options.batch)
 SOURCE = options.source
 TARGET = options.url
-hostMap = json.load(open(options.hostmap))
+if options.hostmap[0:24] == 'hdfs://analytics-hadoop/':
+    hostMap = json.loads(subprocess.check_output(["hdfs", "dfs", "-cat", 
options.hostmap[23:]]))
+else:
+    hostMap = json.load(open(options.hostmap))
 
 print "Transferring from %s to %s" % (SOURCE, TARGET)
 
diff --git a/transferToFile.py b/oozie/transfer_to_es/transferToFile.py
similarity index 100%
rename from transferToFile.py
rename to oozie/transfer_to_es/transferToFile.py
diff --git a/oozie/transfer_to_es/workflow.xml 
b/oozie/transfer_to_es/workflow.xml
new file mode 100644
index 0000000..51679b5
--- /dev/null
+++ b/oozie/transfer_to_es/workflow.xml
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+    
name="discovery-transfer_to_es-${popularity_score_table}-${year},${month},${day}->${elasticsearch_url}-wf">
+
+    <parameters>
+
+        <!-- Default values for inner oozie settings -->
+        <property>
+            <name>oozie_launcher_queue_name</name>
+            <value>${queue_name}</value>
+        </property>
+        <property>
+            <name>oozie_launcher_memory</name>
+            <value>256</value>
+        </property>
+
+        <!-- Required properties -->
+        <property><name>queue_name</name></property>
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+
+        <property>
+            <name>spark_assembly_jar</name>
+            <description>HDFS path to spark-assembly.jar</description>
+        </property>
+        <property>
+            <name>spark_number_executors</name>
+            <description>Number of executors to run job with</description>
+        </property>
+        <property>
+            <name>spark_executor_memory</name>
+            <description>Amount of memory to reserve on executor 
instances</description>
+        </property>
+        <property>
+            <name>spark_driver_memory</name>
+            <description>Amount of memory to reserve on driver 
instance</description>
+        </property>
+
+        <property>
+            <name>popularity_score_table</name>
+            <description>Fully qualified name of the popularity_score table in 
hive</description>
+        </property>
+        <property>
+            <name>popularity_score_data_directory</name>
+            <description>The popularity_score directory path in 
HDFS</description>
+        </property>
+        <property>
+            <name>popularity_score_partition_directory</name>
+            
<value>${popularity_score_data_directory}/agg_days=${days_aggregated}/year=${year}/month=${month}/day=${day}</value>
+        </property>
+
+        <property>
+            <name>days_aggregated</name>
+            <description>Number of days to aggregate page views 
over</description>
+        </property>
+
+        <property>
+            <name>year</name>
+            <description>Exlusive four digit year portion of date to end 
aggregation on</description>
+        </property>
+        <property>
+            <name>month</name>
+            <description>Exclusive unpadded month portion of date to end 
aggregation on</description>
+        </property>
+        <property>
+            <name>day</name>
+            <description>Exclusive unpadded day portion of date to end 
aggregation on</description>
+        </property>
+
+        <property>
+            <name>send_error_email_workflow_file</name>
+            <description>Workflow for sending an email</description>
+        </property>
+
+        <property>
+            <name>discovery_oozie_directory</name>
+            <description>The path to this repositories oozie directory in 
HDFS</description>
+        </property>
+
+        <property>
+            <name>pyspark_transfer_to_es_script</name>
+            
<value>${discovery_oozie_directory}/transfer_to_es/transferToES.py</value>
+            <description>Path to Pyspark script to run for generating the 
popularity score in HDFS.</description>
+        </property>
+        <property>
+            <name>transfer_to_es_hostmap_file</name>
+            
<value>${discovery_oozie_directory}/transfer_to_es/hostmap.json</value>
+            <description>Path to JSON hostmap for domain -> wiki conversion in 
HDFS</description>
+        </property>
+        <property>
+            <name>transfer_to_es_batch_size</name>
+            <description>Items per batch to load into ES</description>
+        </property>
+        <property>
+            <name>elasticsearch_url</name>
+            <description>URL to send the data to with no trailing 
/</description>
+        </property>
+
+    </parameters>
+
+    <start to="transfer"/>
+
+    <action name="transfer">
+        <spark xmlns="uri:oozie:spark-action:0.1">
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <configuration>
+                <property>
+                    <name>mapreduce.job.queuename</name>
+                    <value>${queue_name}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.queue.name</name>
+                    <value>${oozie_launcher_queue_name}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
+                    <value>${oozie_launcher_memory}</value>
+                </property>
+            </configuration>
+            <master>yarn-cluster</master>
+            <mode>cluster</mode>
+            <name>Discovery Transfer To Elasticsearch</name>
+            <jar>${pyspark_transfer_to_es_script}</jar>
+            <spark-opts>--conf spark.yarn.jar=${spark_assembly_jar} 
--executor-memory ${spark_executor_memory} --driver-memory 
${spark_driver_memory} --num-executors ${spark_number_executors} --queue 
${queue_name} --conf spark.yarn.appMasterEnv.SPARK_HOME=/bogus</spark-opts>
+            <!-- arguments for pyspark script -->
+            <arg>--source</arg>
+            <arg>${popularity_score_partition_directory}</arg>
+            <arg>--url</arg>
+            <arg>${elasticsearch_url}</arg>
+            <arg>--batch</arg>
+            <arg>${transfer_to_es_batch_size}</arg>
+            <arg>--hostmap</arg>
+            <arg>${transfer_to_es_hostmap_file}</arg>
+        </spark>
+
+        <ok to="end"/>
+        <error to="kill"/>
+    </action>
+
+    <action name="send_error_email">
+        <sub-workflow>
+            <app-path>${send_error_email_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>parent_name</name>
+                    <value>${wf:name()}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="kill"/>
+        <error to="kill"/>
+    </action>
+
+    <kill name="kill">
+        <message>Action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

-- 
To view, visit https://gerrit.wikimedia.org/r/257436
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I45f93e22dc7d4a9cdf9e2f5fd26f2eb3c248f44e
Gerrit-PatchSet: 11
Gerrit-Project: wikimedia/discovery/analytics
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>
Gerrit-Reviewer: EBernhardson <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Smalyshev <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to