Joal has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/390226 )
Change subject: Add clickstream oozie job
......................................................................
Add clickstream oozie job
This new oozie job runs monthly and by default stores its
resulting datasets in
/wmf/data/archive/clickstream/YYYY-MM/wiki_db=XXXX
for the computed month and wikis.
It uses the util/spark/submit subworkflow to launch the
scala application computing the datasets with a hive-context).
Bug: T175844
Change-Id: Ie3466e19f51f59ba3d2ea6176b9b52bf35cb34b4
---
A oozie/clickstream/README.md
A oozie/clickstream/coordinator.properties
A oozie/clickstream/coordinator.xml
A oozie/clickstream/workflow.xml
4 files changed, 349 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery
refs/changes/26/390226/1
diff --git a/oozie/clickstream/README.md b/oozie/clickstream/README.md
new file mode 100644
index 0000000..e41d3fc
--- /dev/null
+++ b/oozie/clickstream/README.md
@@ -0,0 +1,5 @@
+Oozie job to schedule generating clickstream datasets for various projects.
+The job runs every month and its TSV results are synchronised to the public.
+
+The oozie workflow launches a spark action that runs theW
+ClickstreamBuilder scala job in analytics-refinery-source/refinery-job.
diff --git a/oozie/clickstream/coordinator.properties
b/oozie/clickstream/coordinator.properties
new file mode 100644
index 0000000..497d614
--- /dev/null
+++ b/oozie/clickstream/coordinator.properties
@@ -0,0 +1,77 @@
+# Configures a coordinator to automatically generate monthly clickstream
datasets.
+# Any of the following properties are override-able with -D.
+# Usage:
+# oozie job -Duser=$USER -Dstart_time=2017-10-01T00:00Z -submit -config
oozie/clickstream/coordinator.properties
+#
+# NOTE: The $oozie_directory must be synced to HDFS so that all relevant
+# .xml files exist there when this job is submitted.
+
+name_node = hdfs://analytics-hadoop
+job_tracker = resourcemanager.analytics.eqiad.wmnet:8032
+queue_name = default
+
+#Default user
+user = hdfs
+
+# Base path in HDFS to refinery.
+# When submitting this job for production, you should override this to point
directly at a deployed
+# directory name, and not the 'symbolic' 'current' directory. E.g.
/wmf/refinery/2015-01-05T17.59.18Z--7bb7f07
+refinery_directory = ${name_node}/wmf/refinery/current
+
+# HDFS path to artifacts that will be used by this job.
+# E.g. refinery-job.jar should exist here.
+artifacts_directory = ${refinery_directory}/artifacts
+
+# Base path in HDFS to oozie files.
+# Other files will be used relative to this path.
+oozie_directory = ${refinery_directory}/oozie
+
+# HDFS path to coordinator to run.
+coordinator_file =
${oozie_directory}/clickstream/coordinator.xml
+
+# HDFS path to workflow to run.
+workflow_file = ${oozie_directory}/clickstream/workflow.xml
+
+# webrequest definitions
+webrequest_datasets_file = ${oozie_directory}/webrequest/datasets.xml
+webrequest_data_directory = ${name_node}/wmf/data/wmf/webrequest
+webrequest_table = wmf.webrequest
+
+# mediawiki raw definitions
+mw_raw_datasets_file =
${oozie_directory}/mediawiki/history/datasets_raw.xml
+mw_raw_directory = ${name_node}/wmf/data/raw/mediawiki
+mw_project_namespace_map_table = wmf_raw.mediawiki_project_namespace_map
+mw_page_table = wmf_raw.mediawiki_page
+mw_pagelinks_table = wmf_raw.mediawiki_pagelinks
+mw_redirect_table = wmf_raw.mediawiki_redirect
+
+
+# Initial import time of the webrequest dataset.
+start_time = 2017-10-01T00:00Z
+
+# Time to stop running this coordinator. Year 3000 == never!
+stop_time = 3000-01-01T00:00Z
+
+# Spark job parameters
+spark_master = yarn
+spark_deploy = cluster
+spark_assembly_jar =
${name_node}/user/spark/share/lib/spark-assembly.jar
+spark_app_jar =
${artifacts_directory}/org/wikimedia/analytics/refinery/refinery-job-0.0.55.jar
+spark_app_class =
org.wikimedia.analytics.refinery.job.ClickstreamBuilder
+spark_app_name = ClickstreamBuilder
+spark_executor_memory = 2G
+spark_driver_memory = 4G
+spark_max_executors = 64
+clickstream_wikis = enwiki,ruwiki,dewiki,eswiki,jawiki
+clickstream_minimum_links = 10
+clickstream_output_base_path = ${name_node}/wmf/data/archive/clickstream
+
+# Worflow to submit spark job providing a Hive context
+spark_submit_workflow_file =
${oozie_directory}/util/spark/submit/workflow.xml
+# Workflow to send an error email
+send_error_email_workflow_file =
${oozie_directory}/util/send_error_email/workflow.xml
+
+# Coordinator to start.
+oozie.coord.application.path = ${coordinator_file}
+oozie.use.system.libpath = true
+oozie.action.external.stats.write = true
\ No newline at end of file
diff --git a/oozie/clickstream/coordinator.xml
b/oozie/clickstream/coordinator.xml
new file mode 100644
index 0000000..c656be0
--- /dev/null
+++ b/oozie/clickstream/coordinator.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<coordinator-app xmlns="uri:oozie:coordinator:0.4"
+ name="clickstream-coord"
+ frequency="${coord:months(1)}"
+ start="${start_time}"
+ end="${stop_time}"
+ timezone="Universal">
+
+ <parameters>
+
+ <!-- Required properties. -->
+ <property><name>name_node</name></property>
+ <property><name>job_tracker</name></property>
+ <property><name>queue_name</name></property>
+
+ <property><name>workflow_file</name></property>
+
+ <property><name>webrequest_datasets_file</name></property>
+ <property><name>webrequest_data_directory</name></property>
+ <property><name>webrequest_table</name></property>
+
+ <property><name>mw_raw_datasets_file</name></property>
+ <property><name>mw_raw_directory</name></property>
+ <property><name>mw_project_namespace_map_table</name></property>
+ <property><name>mw_page_table</name></property>
+ <property><name>mw_pagelinks_table</name></property>
+ <property><name>mw_redirect_table</name></property>
+
+ <property><name>start_time</name></property>
+ <property><name>stop_time</name></property>
+
+ <property><name>spark_app_jar</name></property>
+ <property><name>spark_app_class</name></property>
+ <property><name>spark_app_name</name></property>
+ <property><name>spark_executor_memory</name></property>
+ <property><name>spark_executor_memory_overhead</name></property>
+ <property><name>spark_driver_memory</name></property>
+ <property><name>spark_max_executors</name></property>
+ <property><name>clickstream_wikis</name></property>
+ <property><name>clickstream_minimum_links</name></property>
+ <property><name>clickstream_output_base_path</name></property>
+
+ <property><name>spark_submit_workflow_file</name></property>
+ <property><name>send_error_email_workflow_file</name></property>
+ </parameters>
+
+ <controls>
+ <!--(timeout is measured in minutes)-->
+ <timeout>-1</timeout>
+
+ <!-- Setting low concurrency for resource sharing.
+ The job runs pretty fast (~1 minute) and increasing concurrency
should not cause any problems-->
+ <concurrency>2</concurrency>
+
+ <throttle>2</throttle>
+
+ </controls>
+
+ <datasets>
+ <include>${webrequest_datasets_file}</include>
+ <include>${mw_raw_datasets_file}</include>
+ </datasets>
+
+ <input-events>
+ <data-in name="text" dataset="webrequest_text">
+ <start-instance>${coord:current(0)}</start-instance>
+ <end-instance>${coord:current(coord:daysInMonth(0) * 24 -
1)}</end-instance>
+ </data-in>
+ <data-in name="mw_project_namespace_map"
dataset="mw_project_namespace_map">
+ <instance>${coord:current(0)}</instance>
+ </data-in>
+ <data-in name="mw_page_table" dataset="mw_page_table">
+ <instance>${coord:current(0)}</instance>
+ </data-in>
+ <data-in name="mw_pagelinks_table" dataset="mw_pagelinks_table">
+ <instance>${coord:current(0)}</instance>
+ </data-in>
+ <data-in name="mw_redirect_table" dataset="mw_redirect_table">
+ <instance>${coord:current(0)}</instance>
+ </data-in>
+ </input-events>
+
+ <action>
+ <workflow>
+ <app-path>${workflow_file}</app-path>
+ <configuration>
+ <property>
+ <name>year</name>
+ <value>${coord:formatTime(coord:nominalTime(),
"y")}</value>
+ </property>
+ <property>
+ <name>month</name>
+ <value>${coord:formatTime(coord:nominalTime(),
"M")}</value>
+ </property>
+ <property>
+ <name>snapshot</name>
+ <value>${coord:formatTime(coord:nominalTime(),
"yyyy")}-${coord:formatTime(coord:nominalTime(), "MM")}</value>
+ </property>
+ </configuration>
+ </workflow>
+ </action>
+</coordinator-app>
\ No newline at end of file
diff --git a/oozie/clickstream/workflow.xml b/oozie/clickstream/workflow.xml
new file mode 100644
index 0000000..022a19b
--- /dev/null
+++ b/oozie/clickstream/workflow.xml
@@ -0,0 +1,165 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+ name="clickstream-wf-${year}-${month}">
+
+ <parameters>
+
+ <!-- Default values for inner oozie settings -->
+ <property>
+ <name>oozie_launcher_queue_name</name>
+ <value>${queue_name}</value>
+ </property>
+ <property>
+ <name>oozie_launcher_memory</name>
+ <value>2048</value>
+ </property>
+
+ <!-- Required properties -->
+ <property><name>name_node</name></property>
+ <property><name>job_tracker</name></property>
+ <property><name>queue_name</name></property>
+
+ <property>
+ <name>spark_app_jar</name>
+ <description>Path to the jar to be used to run spark
application</description>
+ </property>
+ <property>
+ <name>spark_app_class</name>
+ <description>Class of the spark application to be run</description>
+ </property>
+ <property>
+ <name>spark_app_name</name>
+ <description>The spark application name</description>
+ </property>
+ <property>
+ <name>spark_executor_memory</name>
+ <description>Memory to allocate for each spark
executor</description>
+ </property>
+ <property>
+ <name>spark_executor_memory_overhead</name>
+ <description>Memory-overhead to allocate for each spark
executor</description>
+ </property>
+ <property>
+ <name>spark_driver_memory</name>
+ <description>Memory to allocate for spark driver
process</description>
+ </property>
+ <property>
+ <name>spark_max_executors</name>
+ <description>Maximum concurrent number of executors for spark
dynamic allocation</description>
+ </property>
+
+ <property>
+ <name>webrequest_table</name>
+ <description>The refined webrequest table</description>
+ </property>
+ <property>
+ <name>mw_project_namespace_map_table</name>
+ <description>The mediawiki project namespace map table</description>
+ </property>
+ <property>
+ <name>mw_page_table</name>
+ <description>The mediawiki page table</description>
+ </property>
+ <property>
+ <name>mw_pagelinks_table</name>
+ <description>The mediawiki pagelinks table</description>
+ </property>
+ <property>
+ <name>mw_redirect_table</name>
+ <description>The mediawiki redirect table</description>
+ </property>
+ <property>
+ <name>year</name>
+ <description>The webrequest partition's year</description>
+ </property>
+ <property>
+ <name>month</name>
+ <description>The webrequest partition's month</description>
+ </property>
+ <property>
+ <name>snapshot</name>
+ <description>The snapshot to use for mediawiki raw
data(YYYY-MM)</description>
+ </property>
+ <property>
+ <name>clickstream_wikis</name>
+ <description>Wiki projects to be worked (comma separated wiki_db
list)</description>
+ </property>
+ <property>
+ <name>clickstream_minimum_links</name>
+ <description>Minimum number of times a link needs to have been
followed to be in the resulting dataset</description>
+ </property>
+ <property>
+ <name>clickstream_output_base_path</name>
+ <description>Basepath where to output datasets</description>
+ </property>
+ <property>
+ <name>spark_submit_workflow_file</name>
+ <description>Workflow submitting spark jobs with access to hive
context</description>
+ </property>
+ <property>
+ <name>send_error_email_workflow_file</name>
+ <description>Workflow for sending an email</description>
+ </property>
+ </parameters>
+
+ <start to="generate_clickstream_datasets"/>
+
+ <action name="generate_clickstream_datasets">
+ <sub-workflow>
+ <app-path>${spark_submit_workflow_file}</app-path>
+ <propagate-configuration/>
+ <configuration>
+ <!-- Propagated properties :
+ name_node
+ job_tracker
+ queue_name
+ spark_app_jar
+ spark_app_class
+ spark_app_name
+ spark_driver_memory
+ spark_executor_memory
+ spark_executor_memory_overhead
+ spark_max_executors
+ -->
+ <property>
+ <name>spark_app_options</name>
+ <value>--snapshot ${snapshot} --year ${year} --month
${month} --minimum-count ${clickstream_minimum_links} --output-base-path
${clickstream_output_base_path} --wikis ${clickstream_wikis} --webrequest-table
${webrequest_table} --project-namespace-table ${mw_project_namespace_map_table}
--page-table ${mw_page_table} --redirect-table ${mw_redirect_table}
--pagelinks-table ${mw_pagelinks_table}</value>
+ </property>
+ </configuration>
+ </sub-workflow>
+ <ok to="end" />
+ <error to="send_error_email" />
+ </action>
+
+ <action name="send_error_email">
+ <sub-workflow>
+ <app-path>${send_error_email_workflow_file}</app-path>
+ <propagate-configuration/>
+ <configuration>
+ <property>
+ <name>parent_name</name>
+ <value>${wf:name()}</value>
+ </property>
+ <property>
+ <name>parent_failed_action</name>
+ <value>${wf:lastErrorNode()}</value>
+ </property>
+ <property>
+ <name>parent_error_code</name>
+ <value>${wf:errorCode(wf:lastErrorNode())}</value>
+ </property>
+ <property>
+ <name>parent_error_message</name>
+ <value>${wf:errorMessage(wf:lastErrorNode())}</value>
+ </property>
+ </configuration>
+ </sub-workflow>
+ <ok to="kill"/>
+ <error to="kill"/>
+ </action>
+
+ <kill name="kill">
+ <message>Action failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+ </kill>
+ <end name="end"/>
+</workflow-app>
--
To view, visit https://gerrit.wikimedia.org/r/390226
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie3466e19f51f59ba3d2ea6176b9b52bf35cb34b4
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits