Joal has uploaded a new change for review.
https://gerrit.wikimedia.org/r/274187
Change subject: Add archive stage to last_access_uniques jobs
......................................................................
Add archive stage to last_access_uniques jobs
Aggregate daily and monthly uniques on uri_host and
keep only hosts having more than 1000 uniques.
Archive in hdfs:///wmf/data/archive/unique_devices/{year}/{year}-{month}
Bug: T126767
Change-Id: I2b05e289ef2593d7a0edbb51b6b6b2fd41ba8d01
---
M oozie/last_access_uniques/daily/coordinator.properties
M oozie/last_access_uniques/daily/coordinator.xml
A oozie/last_access_uniques/daily/last_access_uniques_daily_to_archive.hql
M oozie/last_access_uniques/daily/workflow.xml
M oozie/last_access_uniques/monthly/coordinator.properties
M oozie/last_access_uniques/monthly/coordinator.xml
M oozie/last_access_uniques/monthly/last_access_uniques_monthly.hql
A oozie/last_access_uniques/monthly/last_access_uniques_monthly_to_archive.hql
M oozie/last_access_uniques/monthly/workflow.xml
9 files changed, 333 insertions(+), 27 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery
refs/changes/87/274187/1
diff --git a/oozie/last_access_uniques/daily/coordinator.properties
b/oozie/last_access_uniques/daily/coordinator.properties
index 56fb0ac..7b83337 100644
--- a/oozie/last_access_uniques/daily/coordinator.properties
+++ b/oozie/last_access_uniques/daily/coordinator.properties
@@ -1,5 +1,6 @@
# Configures a coordinator to generate daily uniques using last_access method
-# on webrequest table.
+# on webrequest table and save a reduces version (uri_host aggregates with
+# more than 1000 uniques) in archive folders.
# Any of the following properties are overidable with -D.
# Usage:
# oozie job -Duser=$USER -Dstart_time=2015-01-05T00:00Z -submit -config
oozie/last_access_uniques/daily/coordinator.properties
@@ -22,10 +23,11 @@
# Sub worflows path
mark_directory_done_workflow_file =
${oozie_directory}/util/mark_directory_done/workflow.xml
+archive_job_output_workflow_file =
${oozie_directory}/util/archive_job_output/workflow.xml
send_error_email_workflow_file =
${oozie_directory}/util/send_error_email/workflow.xml
-source_table = wmf.webrequest
-destination_table = wmf.last_access_uniques_daily
+webrequest_table = wmf.webrequest
+last_access_uniques_daily_table = wmf.last_access_uniques_daily
# HDFS path to webrequest dataset definition
webrequest_data_directory = ${name_node}/wmf/data/wmf/webrequest
@@ -38,6 +40,14 @@
user = hdfs
workflow_file =
${oozie_directory}/last_access_uniques/daily/workflow.xml
+temporary_directory = ${name_node}/tmp
+
+# Archive base directory
+archive_directory = ${name_node}/wmf/data/archive
+
+# Archive directory for webrequest data
+uniques_archive_directory = ${archive_directory}/unique_devices
+
# Coordinator app to run.
oozie.use.system.libpath = true
oozie.action.external.stats.write = true
diff --git a/oozie/last_access_uniques/daily/coordinator.xml
b/oozie/last_access_uniques/daily/coordinator.xml
index 09b5f14..7764c5b 100644
--- a/oozie/last_access_uniques/daily/coordinator.xml
+++ b/oozie/last_access_uniques/daily/coordinator.xml
@@ -10,18 +10,22 @@
<property><name>name_node</name></property>
<property><name>job_tracker</name></property>
<property><name>mark_directory_done_workflow_file</name></property>
+ <property><name>archive_job_output_workflow_file</name></property>
<property><name>send_error_email_workflow_file</name></property>
<property><name>workflow_file</name></property>
<property><name>start_time</name></property>
<property><name>stop_time</name></property>
- <property><name>source_table</name></property>
- <property><name>destination_table</name></property>
+ <property><name>webrequest_table</name></property>
+ <property><name>last_access_uniques_daily_table</name></property>
<property><name>webrequest_data_directory</name></property>
<property><name>webrequest_datasets_file</name></property>
<property><name>last_access_uniques_data_directory</name></property>
<property><name>last_access_uniques_datasets_file</name></property>
+
+ <property><name>temporary_directory</name></property>
+ <property><name>uniques_archive_directory</name></property>
</parameters>
<controls>
@@ -76,7 +80,7 @@
<value>${coord:formatTime(coord:nominalTime(), "d")}</value>
</property>
<property>
- <name>destination_dataset_directory</name>
+ <name>uniques_dataset_directory</name>
<value>${coord:dataOut('last_access_uniques_daily_output')}</value>
</property>
</configuration>
diff --git
a/oozie/last_access_uniques/daily/last_access_uniques_daily_to_archive.hql
b/oozie/last_access_uniques/daily/last_access_uniques_daily_to_archive.hql
new file mode 100644
index 0000000..2efc6c9
--- /dev/null
+++ b/oozie/last_access_uniques/daily/last_access_uniques_daily_to_archive.hql
@@ -0,0 +1,48 @@
+-- Aggregate daily uniques on uri_host and
+-- keep only hosts having more than 1000 uniques daily.
+--
+-- Parameters:
+-- source_table -- Table containing source data
+-- destination_directory -- Table where to right newly computed data
+-- year -- year of the to-be-generated
+-- month -- month of the to-be-generated
+-- day -- day of the to-be-generated
+--
+-- Usage:
+-- hive -f last_access_uniques_daily_to_archive.hql \
+-- -d source_table=wmf.last_access_uniques_daily \
+-- -d destination_directory=/tmp/archive/last_access_uniques_daily \
+-- -d year=2016 \
+-- -d month=1 \
+-- -d day=1
+
+
+-- Set compression codec to gzip to provide asked format
+SET hive.exec.compress.output=true;
+SET
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+INSERT OVERWRITE DIRECTORY "${destination_directory}"
+ -- Since "ROW FORMAT DELIMITED DELIMITED FIELDS TERMINATED BY ' '" only
+ -- works for exports to local directories (see HIVE-5672), we have to
+ -- prepare the lines by hand through concatenation :-(
+ -- Set 0 as volume column since we don't use it.
+ SELECT
+ CONCAT_WS('\t', uri_host, cast(uniques_estimate AS string)) line
+ FROM (
+ SELECT
+ uri_host,
+ SUM(uniques_estimate) as uniques_estimate
+ FROM ${source_table}
+ WHERE year=${year}
+ AND month=${month}
+ AND day=${day}
+ GROUP BY
+ uri_host
+ HAVING
+ SUM(uniques_estimate) >= 1000
+ ORDER BY
+ uniques_estimate DESC
+ LIMIT 100000000
+ ) uniques_transformed
+;
diff --git a/oozie/last_access_uniques/daily/workflow.xml
b/oozie/last_access_uniques/daily/workflow.xml
index d875f18..73f0f65 100644
--- a/oozie/last_access_uniques/daily/workflow.xml
+++ b/oozie/last_access_uniques/daily/workflow.xml
@@ -22,12 +22,12 @@
</property>
<!-- specifying parameter values in file to test running -->
<property>
- <name>source_table</name>
+ <name>webrequest_table</name>
<description>Hive table to read data from.</description>
</property>
<property>
- <name>destination_table</name>
- <description>The destinaton table to store uniques data
in.</description>
+ <name>last_access_uniques_daily_table</name>
+ <description>The table to store uniques data in.</description>
</property>
<property>
<name>year</name>
@@ -42,8 +42,16 @@
<description>The partition's day</description>
</property>
<property>
+ <name>uniques_dataset_directory</name>
+ <description>Directory where the hive data is stored</description>
+ </property>
+ <property>
<name>mark_directory_done_workflow_file</name>
<description>Workflow for marking a directory done</description>
+ </property>
+ <property>
+ <name>archive_job_output_workflow_file</name>
+ <description>Workflow for archiving result files</description>
</property>
<property>
<name>send_error_email_workflow_file</name>
@@ -83,8 +91,8 @@
</property>
</configuration>
<script>last_access_uniques_daily.hql</script>
- <param>source_table=${source_table}</param>
- <param>destination_table=${destination_table}</param>
+ <param>source_table=${webrequest_table}</param>
+ <param>destination_table=${last_access_uniques_daily_table}</param>
<param>year=${year}</param>
<param>month=${month}</param>
<param>day=${day}</param>
@@ -99,7 +107,93 @@
<configuration>
<property>
<name>directory</name>
- <value>${destination_dataset_directory}</value>
+ <value>${uniques_dataset_directory}</value>
+ </property>
+ </configuration>
+ </sub-workflow>
+ <ok to="transform_to_archive"/>
+ <error to="send_error_email"/>
+ </action>
+
+ <action name="transform_to_archive">
+ <hive xmlns="uri:oozie:hive-action:0.2">
+ <job-tracker>${job_tracker}</job-tracker>
+ <name-node>${name_node}</name-node>
+ <job-xml>${hive_site_xml}</job-xml>
+ <configuration>
+ <property>
+ <name>mapreduce.job.queuename</name>
+ <value>${queue_name}</value>
+ </property>
+ <!--make sure oozie:launcher runs in a low priority queue -->
+ <property>
+ <name>oozie.launcher.mapred.job.queue.name</name>
+ <value>${oozie_launcher_queue_name}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapreduce.map.memory.mb</name>
+ <value>${oozie_launcher_memory}</value>
+ </property>
+ <!--Let hive decide on the number of reducers -->
+ <property>
+ <name>mapred.reduce.tasks</name>
+ <value>-1</value>
+ </property>
+ <property>
+ <name>hive.exec.scratchdir</name>
+ <value>/tmp/hive-${user}</value>
+ </property>
+ </configuration>
+
+ <script>last_access_uniques_daily_to_archive.hql</script>
+ <!-- Here, the source for archive is the
+ destination of the previous job -->
+ <param>source_table=${last_access_uniques_daily_table}</param>
+ <param>year=${year}</param>
+ <param>month=${month}</param>
+ <param>day=${day}</param>
+
<param>destination_directory=${temporary_directory}/${wf:id()}</param>
+ </hive>
+
+ <ok to="mark_transformed_uniques_dataset_done"/>
+ <error to="send_error_email"/>
+ </action>
+
+ <action name="mark_transformed_uniques_dataset_done">
+ <sub-workflow>
+ <app-path>${mark_directory_done_workflow_file}</app-path>
+ <configuration>
+ <property>
+ <name>directory</name>
+ <value>${temporary_directory}/${wf:id()}</value>
+ </property>
+ </configuration>
+ </sub-workflow>
+ <ok to="move_data_to_archive"/>
+ <error to="send_error_email"/>
+ </action>
+
+ <action name="move_data_to_archive">
+ <sub-workflow>
+ <app-path>${archive_job_output_workflow_file}</app-path>
+ <propagate-configuration/>
+ <configuration>
+ <property>
+ <name>source_directory</name>
+ <value>${temporary_directory}/${wf:id()}</value>
+ </property>
+ <property>
+ <name>expected_filename_ending</name>
+ <value>.gz</value>
+ </property>
+ <property>
+ <name>archive_file</name>
+ <!--
+ webstatscollector used the end of the collection period as
+ timestamp in the filename. To not break scripts of people,
+ we also name files that way.
+ -->
+
<value>${uniques_archive_directory}/${year}/${year}-${month}/unique_devices_daily-${year}-${month}-${day}.gz</value>
</property>
</configuration>
</sub-workflow>
diff --git a/oozie/last_access_uniques/monthly/coordinator.properties
b/oozie/last_access_uniques/monthly/coordinator.properties
index d1adc82..b2701ae 100644
--- a/oozie/last_access_uniques/monthly/coordinator.properties
+++ b/oozie/last_access_uniques/monthly/coordinator.properties
@@ -1,6 +1,6 @@
-
# Configures a coordinator to generate daily uniques using last_access method
-# on webrequest table.
+# on webrequest table and save a reduces version (uri_host aggregates with
+# more than 1000 uniques) in archive folders.
# Any of the following properties are overidable with -D.
# Usage:
# oozie job -Duser=$USER -Dstart_time=2015-01-05T00:00Z -submit -config
oozie/last_access_uniques/daily/coordinator.properties
@@ -22,10 +22,11 @@
# Sub worflows path
mark_directory_done_workflow_file =
${oozie_directory}/util/mark_directory_done/workflow.xml
+archive_job_output_workflow_file =
${oozie_directory}/util/archive_job_output/workflow.xml
send_error_email_workflow_file =
${oozie_directory}/util/send_error_email/workflow.xml
-source_table = wmf.webrequest
-destination_table = wmf.last_access_uniques_monthly
+webrequest_table = wmf.webrequest
+last_access_uniques_monthly_table = wmf.last_access_uniques_monthly
# HDFS path to webrequest dataset definition
webrequest_data_directory = ${name_node}/wmf/data/wmf/webrequest
@@ -35,6 +36,14 @@
last_access_uniques_data_directory =
${name_node}/wmf/data/wmf/last_access_uniques
last_access_uniques_datasets_file =
${oozie_directory}/last_access_uniques/datasets.xml
+temporary_directory = ${name_node}/tmp
+
+# Archive base directory
+archive_directory = ${name_node}/wmf/data/archive
+
+# Archive directory for webrequest data
+uniques_archive_directory = ${archive_directory}/unique_devices
+
user = hdfs
workflow_file =
${oozie_directory}/last_access_uniques/monthly/workflow.xml
diff --git a/oozie/last_access_uniques/monthly/coordinator.xml
b/oozie/last_access_uniques/monthly/coordinator.xml
index eaad896..d5b8071 100644
--- a/oozie/last_access_uniques/monthly/coordinator.xml
+++ b/oozie/last_access_uniques/monthly/coordinator.xml
@@ -10,18 +10,22 @@
<property><name>name_node</name></property>
<property><name>job_tracker</name></property>
<property><name>mark_directory_done_workflow_file</name></property>
+ <property><name>archive_job_output_workflow_file</name></property>
<property><name>send_error_email_workflow_file</name></property>
<property><name>workflow_file</name></property>
<property><name>start_time</name></property>
<property><name>stop_time</name></property>
- <property><name>source_table</name></property>
- <property><name>destination_table</name></property>
+ <property><name>webrequest_table</name></property>
+ <property><name>last_access_uniques_monthly_table</name></property>
<property><name>webrequest_data_directory</name></property>
<property><name>webrequest_datasets_file</name></property>
<property><name>last_access_uniques_data_directory</name></property>
<property><name>last_access_uniques_datasets_file</name></property>
+
+ <property><name>temporary_directory</name></property>
+ <property><name>uniques_archive_directory</name></property>
</parameters>
<controls>
@@ -71,7 +75,7 @@
<value>${coord:formatTime(coord:nominalTime(), "M")}</value>
</property>
<property>
- <name>destination_dataset_directory</name>
+ <name>uniques_dataset_directory</name>
<value>${coord:dataOut('last_access_uniques_monthly_output')}</value>
</property>
</configuration>
diff --git a/oozie/last_access_uniques/monthly/last_access_uniques_monthly.hql
b/oozie/last_access_uniques/monthly/last_access_uniques_monthly.hql
index 4dad91d..9ffd6cd 100644
--- a/oozie/last_access_uniques/monthly/last_access_uniques_monthly.hql
+++ b/oozie/last_access_uniques/monthly/last_access_uniques_monthly.hql
@@ -22,7 +22,6 @@
SELECT
year,
month,
- day,
lower(uri_host) as uri_host,
geocoded_data['country'] AS country,
geocoded_data['country_code'] AS country_code,
diff --git
a/oozie/last_access_uniques/monthly/last_access_uniques_monthly_to_archive.hql
b/oozie/last_access_uniques/monthly/last_access_uniques_monthly_to_archive.hql
new file mode 100644
index 0000000..8968a83
--- /dev/null
+++
b/oozie/last_access_uniques/monthly/last_access_uniques_monthly_to_archive.hql
@@ -0,0 +1,45 @@
+-- Aggregate daily uniques on uri_host and
+-- keep only hosts having more than 1000 uniques daily.
+--
+-- Parameters:
+-- source_table -- Table containing source data
+-- destination_directory -- Table where to right newly computed data
+-- year -- year of the to-be-generated
+-- month -- month of the to-be-generated
+--
+-- Usage:
+-- hive -f last_access_uniques_monthly_to_archive.hql \
+-- -d source_table=wmf.last_access_uniques_monthly \
+-- -d destination_directory=/tmp/archive/last_access_uniques_monthly \
+-- -d year=2016 \
+-- -d month=1
+
+
+-- Set compression codec to gzip to provide asked format
+SET hive.exec.compress.output=true;
+SET
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+INSERT OVERWRITE DIRECTORY "${destination_directory}"
+ -- Since "ROW FORMAT DELIMITED DELIMITED FIELDS TERMINATED BY ' '" only
+ -- works for exports to local directories (see HIVE-5672), we have to
+ -- prepare the lines by hand through concatenation :-(
+ -- Set 0 as volume column since we don't use it.
+ SELECT
+ CONCAT_WS('\t', uri_host, cast(uniques_estimate AS string)) line
+ FROM (
+ SELECT
+ uri_host,
+ SUM(uniques_estimate) as uniques_estimate
+ FROM ${source_table}
+ WHERE year=${year}
+ AND month=${month}
+ GROUP BY
+ uri_host
+ HAVING
+ SUM(uniques_estimate) >= 1000
+ ORDER BY
+ uniques_estimate DESC
+ LIMIT 100000000
+ ) uniques_transformed
+;
diff --git a/oozie/last_access_uniques/monthly/workflow.xml
b/oozie/last_access_uniques/monthly/workflow.xml
index af141cc..a01a155 100644
--- a/oozie/last_access_uniques/monthly/workflow.xml
+++ b/oozie/last_access_uniques/monthly/workflow.xml
@@ -21,13 +21,13 @@
<description>hive-site.xml file path in HDFS</description>
</property>
<!-- specifying parameter values in file to test running -->
- <property>
- <name>source_table</name>
+ <property>
+ <name>webrequest_table</name>
<description>Hive table to read data from.</description>
</property>
<property>
- <name>destination_table</name>
- <description>The destinaton table to store uniques data
in.</description>
+ <name>last_access_uniques_monthly_table</name>
+ <description>The table to store uniques data in.</description>
</property>
<property>
<name>year</name>
@@ -38,8 +38,16 @@
<description>The partition's month</description>
</property>
<property>
+ <name>uniques_dataset_directory</name>
+ <description>Directory where the hive data is stored</description>
+ </property>
+ <property>
<name>mark_directory_done_workflow_file</name>
<description>Workflow for marking a directory done</description>
+ </property>
+ <property>
+ <name>archive_job_output_workflow_file</name>
+ <description>Workflow for archiving result files</description>
</property>
<property>
<name>send_error_email_workflow_file</name>
@@ -79,8 +87,8 @@
</property>
</configuration>
<script>last_access_uniques_monthly.hql</script>
- <param>source_table=${source_table}</param>
- <param>destination_table=${destination_table}</param>
+ <param>source_table=${webrequest_table}</param>
+
<param>destination_table=${last_access_uniques_monthly_table}</param>
<param>year=${year}</param>
<param>month=${month}</param>
</hive>
@@ -94,7 +102,92 @@
<configuration>
<property>
<name>directory</name>
- <value>${destination_dataset_directory}</value>
+ <value>${uniques_dataset_directory}</value>
+ </property>
+ </configuration>
+ </sub-workflow>
+ <ok to="transform_to_archive"/>
+ <error to="send_error_email"/>
+ </action>
+
+ <action name="transform_to_archive">
+ <hive xmlns="uri:oozie:hive-action:0.2">
+ <job-tracker>${job_tracker}</job-tracker>
+ <name-node>${name_node}</name-node>
+ <job-xml>${hive_site_xml}</job-xml>
+ <configuration>
+ <property>
+ <name>mapreduce.job.queuename</name>
+ <value>${queue_name}</value>
+ </property>
+ <!--make sure oozie:launcher runs in a low priority queue -->
+ <property>
+ <name>oozie.launcher.mapred.job.queue.name</name>
+ <value>${oozie_launcher_queue_name}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapreduce.map.memory.mb</name>
+ <value>${oozie_launcher_memory}</value>
+ </property>
+ <!--Let hive decide on the number of reducers -->
+ <property>
+ <name>mapred.reduce.tasks</name>
+ <value>-1</value>
+ </property>
+ <property>
+ <name>hive.exec.scratchdir</name>
+ <value>/tmp/hive-${user}</value>
+ </property>
+ </configuration>
+
+ <script>last_access_uniques_monthly_to_archive.hql</script>
+ <!-- Here, the source for archive is the
+ destination of the previous job -->
+ <param>source_table=${last_access_uniques_monthly_table}</param>
+ <param>year=${year}</param>
+ <param>month=${month}</param>
+
<param>destination_directory=${temporary_directory}/${wf:id()}</param>
+ </hive>
+
+ <ok to="mark_transformed_uniques_dataset_done"/>
+ <error to="send_error_email"/>
+ </action>
+
+ <action name="mark_transformed_uniques_dataset_done">
+ <sub-workflow>
+ <app-path>${mark_directory_done_workflow_file}</app-path>
+ <configuration>
+ <property>
+ <name>directory</name>
+ <value>${temporary_directory}/${wf:id()}</value>
+ </property>
+ </configuration>
+ </sub-workflow>
+ <ok to="move_data_to_archive"/>
+ <error to="send_error_email"/>
+ </action>
+
+ <action name="move_data_to_archive">
+ <sub-workflow>
+ <app-path>${archive_job_output_workflow_file}</app-path>
+ <propagate-configuration/>
+ <configuration>
+ <property>
+ <name>source_directory</name>
+ <value>${temporary_directory}/${wf:id()}</value>
+ </property>
+ <property>
+ <name>expected_filename_ending</name>
+ <value>.gz</value>
+ </property>
+ <property>
+ <name>archive_file</name>
+ <!--
+ webstatscollector used the end of the collection period as
+ timestamp in the filename. To not break scripts of people,
+ we also name files that way.
+ -->
+
<value>${uniques_archive_directory}/${year}/${year}-${month}/unique_devices_monthly-${year}-${month}.gz</value>
</property>
</configuration>
</sub-workflow>
--
To view, visit https://gerrit.wikimedia.org/r/274187
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I2b05e289ef2593d7a0edbb51b6b6b2fd41ba8d01
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits