Joal has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/274187

Change subject: Add archive stage to last_access_uniques jobs
......................................................................

Add archive stage to last_access_uniques jobs

Aggregate daily and monthly uniques on uri_host and
keep only hosts having more than 1000 uniques.
Archive in hdfs:///wmf/data/archive/unique_devices/{year}/{year}-{month}

Bug: T126767
Change-Id: I2b05e289ef2593d7a0edbb51b6b6b2fd41ba8d01
---
M oozie/last_access_uniques/daily/coordinator.properties
M oozie/last_access_uniques/daily/coordinator.xml
A oozie/last_access_uniques/daily/last_access_uniques_daily_to_archive.hql
M oozie/last_access_uniques/daily/workflow.xml
M oozie/last_access_uniques/monthly/coordinator.properties
M oozie/last_access_uniques/monthly/coordinator.xml
M oozie/last_access_uniques/monthly/last_access_uniques_monthly.hql
A oozie/last_access_uniques/monthly/last_access_uniques_monthly_to_archive.hql
M oozie/last_access_uniques/monthly/workflow.xml
9 files changed, 333 insertions(+), 27 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery 
refs/changes/87/274187/1

diff --git a/oozie/last_access_uniques/daily/coordinator.properties 
b/oozie/last_access_uniques/daily/coordinator.properties
index 56fb0ac..7b83337 100644
--- a/oozie/last_access_uniques/daily/coordinator.properties
+++ b/oozie/last_access_uniques/daily/coordinator.properties
@@ -1,5 +1,6 @@
 # Configures a coordinator to generate daily uniques using last_access method
-# on webrequest table.
+# on webrequest table and save a reduces version (uri_host aggregates with
+# more than 1000 uniques) in archive folders.
 # Any of the following properties are overidable with -D.
 # Usage:
 #   oozie job -Duser=$USER -Dstart_time=2015-01-05T00:00Z -submit -config 
oozie/last_access_uniques/daily/coordinator.properties
@@ -22,10 +23,11 @@
 
 # Sub worflows path
 mark_directory_done_workflow_file = 
${oozie_directory}/util/mark_directory_done/workflow.xml
+archive_job_output_workflow_file  = 
${oozie_directory}/util/archive_job_output/workflow.xml
 send_error_email_workflow_file    = 
${oozie_directory}/util/send_error_email/workflow.xml
 
-source_table                      = wmf.webrequest
-destination_table                 = wmf.last_access_uniques_daily
+webrequest_table                  = wmf.webrequest
+last_access_uniques_daily_table   = wmf.last_access_uniques_daily
 
 # HDFS path to webrequest dataset definition
 webrequest_data_directory         = ${name_node}/wmf/data/wmf/webrequest
@@ -38,6 +40,14 @@
 user                              = hdfs
 workflow_file                     = 
${oozie_directory}/last_access_uniques/daily/workflow.xml
 
+temporary_directory               = ${name_node}/tmp
+
+# Archive base directory
+archive_directory                 = ${name_node}/wmf/data/archive
+
+# Archive directory for webrequest data
+uniques_archive_directory         = ${archive_directory}/unique_devices
+
 # Coordinator app to run.
 oozie.use.system.libpath          = true
 oozie.action.external.stats.write = true
diff --git a/oozie/last_access_uniques/daily/coordinator.xml 
b/oozie/last_access_uniques/daily/coordinator.xml
index 09b5f14..7764c5b 100644
--- a/oozie/last_access_uniques/daily/coordinator.xml
+++ b/oozie/last_access_uniques/daily/coordinator.xml
@@ -10,18 +10,22 @@
         <property><name>name_node</name></property>
         <property><name>job_tracker</name></property>
         <property><name>mark_directory_done_workflow_file</name></property>
+        <property><name>archive_job_output_workflow_file</name></property>
         <property><name>send_error_email_workflow_file</name></property>
         <property><name>workflow_file</name></property>
         <property><name>start_time</name></property>
         <property><name>stop_time</name></property>
 
-        <property><name>source_table</name></property>
-        <property><name>destination_table</name></property>
+        <property><name>webrequest_table</name></property>
+        <property><name>last_access_uniques_daily_table</name></property>
 
         <property><name>webrequest_data_directory</name></property>
         <property><name>webrequest_datasets_file</name></property>
         <property><name>last_access_uniques_data_directory</name></property>
         <property><name>last_access_uniques_datasets_file</name></property>
+
+        <property><name>temporary_directory</name></property>
+        <property><name>uniques_archive_directory</name></property>
     </parameters>
 
     <controls>
@@ -76,7 +80,7 @@
                   <value>${coord:formatTime(coord:nominalTime(), "d")}</value>
               </property>
               <property>
-                    <name>destination_dataset_directory</name>
+                    <name>uniques_dataset_directory</name>
                     
<value>${coord:dataOut('last_access_uniques_daily_output')}</value>
                 </property>
           </configuration>
diff --git 
a/oozie/last_access_uniques/daily/last_access_uniques_daily_to_archive.hql 
b/oozie/last_access_uniques/daily/last_access_uniques_daily_to_archive.hql
new file mode 100644
index 0000000..2efc6c9
--- /dev/null
+++ b/oozie/last_access_uniques/daily/last_access_uniques_daily_to_archive.hql
@@ -0,0 +1,48 @@
+-- Aggregate daily uniques on uri_host and
+-- keep only hosts having more than 1000 uniques daily.
+--
+-- Parameters:
+--     source_table           -- Table containing source data
+--     destination_directory  -- Table where to right newly computed data
+--     year                   -- year of the to-be-generated
+--     month                  -- month of the to-be-generated
+--     day                   -- day of the to-be-generated
+--
+-- Usage:
+--     hive -f last_access_uniques_daily_to_archive.hql \
+--         -d source_table=wmf.last_access_uniques_daily \
+--         -d destination_directory=/tmp/archive/last_access_uniques_daily \
+--         -d year=2016 \
+--         -d month=1 \
+--         -d day=1
+
+
+-- Set compression codec to gzip to provide asked format
+SET hive.exec.compress.output=true;
+SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+INSERT OVERWRITE DIRECTORY "${destination_directory}"
+    -- Since "ROW FORMAT DELIMITED DELIMITED FIELDS TERMINATED BY ' '" only
+    -- works for exports to local directories (see HIVE-5672), we have to
+    -- prepare the lines by hand through concatenation :-(
+    -- Set 0 as volume column since we don't use it.
+    SELECT
+        CONCAT_WS('\t', uri_host, cast(uniques_estimate AS string)) line
+    FROM (
+        SELECT
+            uri_host,
+            SUM(uniques_estimate) as uniques_estimate
+        FROM ${source_table}
+        WHERE year=${year}
+            AND month=${month}
+            AND day=${day}
+        GROUP BY
+            uri_host
+        HAVING
+            SUM(uniques_estimate) >= 1000
+        ORDER BY
+            uniques_estimate DESC
+        LIMIT 100000000
+    ) uniques_transformed
+;
diff --git a/oozie/last_access_uniques/daily/workflow.xml 
b/oozie/last_access_uniques/daily/workflow.xml
index d875f18..73f0f65 100644
--- a/oozie/last_access_uniques/daily/workflow.xml
+++ b/oozie/last_access_uniques/daily/workflow.xml
@@ -22,12 +22,12 @@
         </property>
         <!-- specifying parameter values in file to test running -->
         <property>
-            <name>source_table</name>
+            <name>webrequest_table</name>
             <description>Hive table to read data from.</description>
         </property>
         <property>
-            <name>destination_table</name>
-            <description>The destinaton table to store uniques data 
in.</description>
+            <name>last_access_uniques_daily_table</name>
+            <description>The table to store uniques data in.</description>
         </property>
         <property>
             <name>year</name>
@@ -42,8 +42,16 @@
             <description>The partition's day</description>
         </property>
         <property>
+            <name>uniques_dataset_directory</name>
+            <description>Directory where the hive data is stored</description>
+        </property>
+        <property>
             <name>mark_directory_done_workflow_file</name>
             <description>Workflow for marking a directory done</description>
+        </property>
+        <property>
+            <name>archive_job_output_workflow_file</name>
+            <description>Workflow for archiving result files</description>
         </property>
         <property>
             <name>send_error_email_workflow_file</name>
@@ -83,8 +91,8 @@
                 </property>
             </configuration>
             <script>last_access_uniques_daily.hql</script>
-            <param>source_table=${source_table}</param>
-            <param>destination_table=${destination_table}</param>
+            <param>source_table=${webrequest_table}</param>
+            <param>destination_table=${last_access_uniques_daily_table}</param>
             <param>year=${year}</param>
             <param>month=${month}</param>
             <param>day=${day}</param>
@@ -99,7 +107,93 @@
             <configuration>
                 <property>
                     <name>directory</name>
-                    <value>${destination_dataset_directory}</value>
+                    <value>${uniques_dataset_directory}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="transform_to_archive"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="transform_to_archive">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <job-xml>${hive_site_xml}</job-xml>
+            <configuration>
+                <property>
+                    <name>mapreduce.job.queuename</name>
+                    <value>${queue_name}</value>
+                </property>
+                <!--make sure oozie:launcher runs in a low priority queue -->
+                <property>
+                    <name>oozie.launcher.mapred.job.queue.name</name>
+                    <value>${oozie_launcher_queue_name}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
+                    <value>${oozie_launcher_memory}</value>
+                </property>
+                <!--Let hive decide on the number of reducers -->
+                <property>
+                    <name>mapred.reduce.tasks</name>
+                    <value>-1</value>
+                </property>
+                <property>
+                    <name>hive.exec.scratchdir</name>
+                    <value>/tmp/hive-${user}</value>
+                </property>
+            </configuration>
+
+            <script>last_access_uniques_daily_to_archive.hql</script>
+            <!-- Here, the source for archive is the
+                 destination of the previous job -->
+            <param>source_table=${last_access_uniques_daily_table}</param>
+            <param>year=${year}</param>
+            <param>month=${month}</param>
+            <param>day=${day}</param>
+            
<param>destination_directory=${temporary_directory}/${wf:id()}</param>
+        </hive>
+
+        <ok to="mark_transformed_uniques_dataset_done"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="mark_transformed_uniques_dataset_done">
+        <sub-workflow>
+            <app-path>${mark_directory_done_workflow_file}</app-path>
+            <configuration>
+                <property>
+                    <name>directory</name>
+                    <value>${temporary_directory}/${wf:id()}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="move_data_to_archive"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="move_data_to_archive">
+        <sub-workflow>
+            <app-path>${archive_job_output_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>source_directory</name>
+                    <value>${temporary_directory}/${wf:id()}</value>
+                </property>
+                <property>
+                    <name>expected_filename_ending</name>
+                    <value>.gz</value>
+                </property>
+                <property>
+                    <name>archive_file</name>
+                    <!--
+                    webstatscollector used the end of the collection period as
+                    timestamp in the filename. To not break scripts of people,
+                    we also name files that way.
+                    -->
+                    
<value>${uniques_archive_directory}/${year}/${year}-${month}/unique_devices_daily-${year}-${month}-${day}.gz</value>
                 </property>
             </configuration>
         </sub-workflow>
diff --git a/oozie/last_access_uniques/monthly/coordinator.properties 
b/oozie/last_access_uniques/monthly/coordinator.properties
index d1adc82..b2701ae 100644
--- a/oozie/last_access_uniques/monthly/coordinator.properties
+++ b/oozie/last_access_uniques/monthly/coordinator.properties
@@ -1,6 +1,6 @@
-
 # Configures a coordinator to generate daily uniques using last_access method
-# on webrequest table.
+# on webrequest table and save a reduces version (uri_host aggregates with
+# more than 1000 uniques) in archive folders.
 # Any of the following properties are overidable with -D.
 # Usage:
 #   oozie job -Duser=$USER -Dstart_time=2015-01-05T00:00Z -submit -config 
oozie/last_access_uniques/daily/coordinator.properties
@@ -22,10 +22,11 @@
 
 # Sub worflows path
 mark_directory_done_workflow_file = 
${oozie_directory}/util/mark_directory_done/workflow.xml
+archive_job_output_workflow_file  = 
${oozie_directory}/util/archive_job_output/workflow.xml
 send_error_email_workflow_file    = 
${oozie_directory}/util/send_error_email/workflow.xml
 
-source_table                      = wmf.webrequest
-destination_table                 = wmf.last_access_uniques_monthly
+webrequest_table                  = wmf.webrequest
+last_access_uniques_monthly_table   = wmf.last_access_uniques_monthly
 
 # HDFS path to webrequest dataset definition
 webrequest_data_directory         = ${name_node}/wmf/data/wmf/webrequest
@@ -35,6 +36,14 @@
 last_access_uniques_data_directory = 
${name_node}/wmf/data/wmf/last_access_uniques
 last_access_uniques_datasets_file = 
${oozie_directory}/last_access_uniques/datasets.xml
 
+temporary_directory               = ${name_node}/tmp
+
+# Archive base directory
+archive_directory                 = ${name_node}/wmf/data/archive
+
+# Archive directory for webrequest data
+uniques_archive_directory         = ${archive_directory}/unique_devices
+
 user                              = hdfs
 workflow_file                     = 
${oozie_directory}/last_access_uniques/monthly/workflow.xml
 
diff --git a/oozie/last_access_uniques/monthly/coordinator.xml 
b/oozie/last_access_uniques/monthly/coordinator.xml
index eaad896..d5b8071 100644
--- a/oozie/last_access_uniques/monthly/coordinator.xml
+++ b/oozie/last_access_uniques/monthly/coordinator.xml
@@ -10,18 +10,22 @@
         <property><name>name_node</name></property>
         <property><name>job_tracker</name></property>
         <property><name>mark_directory_done_workflow_file</name></property>
+        <property><name>archive_job_output_workflow_file</name></property>
         <property><name>send_error_email_workflow_file</name></property>
         <property><name>workflow_file</name></property>
         <property><name>start_time</name></property>
         <property><name>stop_time</name></property>
 
-        <property><name>source_table</name></property>
-        <property><name>destination_table</name></property>
+        <property><name>webrequest_table</name></property>
+        <property><name>last_access_uniques_monthly_table</name></property>
 
         <property><name>webrequest_data_directory</name></property>
         <property><name>webrequest_datasets_file</name></property>
         <property><name>last_access_uniques_data_directory</name></property>
         <property><name>last_access_uniques_datasets_file</name></property>
+
+        <property><name>temporary_directory</name></property>
+        <property><name>uniques_archive_directory</name></property>
     </parameters>
 
     <controls>
@@ -71,7 +75,7 @@
                   <value>${coord:formatTime(coord:nominalTime(), "M")}</value>
               </property>
               <property>
-                    <name>destination_dataset_directory</name>
+                    <name>uniques_dataset_directory</name>
                     
<value>${coord:dataOut('last_access_uniques_monthly_output')}</value>
                 </property>
           </configuration>
diff --git a/oozie/last_access_uniques/monthly/last_access_uniques_monthly.hql 
b/oozie/last_access_uniques/monthly/last_access_uniques_monthly.hql
index 4dad91d..9ffd6cd 100644
--- a/oozie/last_access_uniques/monthly/last_access_uniques_monthly.hql
+++ b/oozie/last_access_uniques/monthly/last_access_uniques_monthly.hql
@@ -22,7 +22,6 @@
     SELECT
         year,
         month,
-        day,
         lower(uri_host) as uri_host,
         geocoded_data['country'] AS country,
         geocoded_data['country_code'] AS country_code,
diff --git 
a/oozie/last_access_uniques/monthly/last_access_uniques_monthly_to_archive.hql 
b/oozie/last_access_uniques/monthly/last_access_uniques_monthly_to_archive.hql
new file mode 100644
index 0000000..8968a83
--- /dev/null
+++ 
b/oozie/last_access_uniques/monthly/last_access_uniques_monthly_to_archive.hql
@@ -0,0 +1,45 @@
+-- Aggregate daily uniques on uri_host and
+-- keep only hosts having more than 1000 uniques daily.
+--
+-- Parameters:
+--     source_table           -- Table containing source data
+--     destination_directory  -- Table where to right newly computed data
+--     year                   -- year of the to-be-generated
+--     month                  -- month of the to-be-generated
+--
+-- Usage:
+--     hive -f last_access_uniques_monthly_to_archive.hql \
+--         -d source_table=wmf.last_access_uniques_monthly \
+--         -d destination_directory=/tmp/archive/last_access_uniques_monthly \
+--         -d year=2016 \
+--         -d month=1
+
+
+-- Set compression codec to gzip to provide asked format
+SET hive.exec.compress.output=true;
+SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+INSERT OVERWRITE DIRECTORY "${destination_directory}"
+    -- Since "ROW FORMAT DELIMITED DELIMITED FIELDS TERMINATED BY ' '" only
+    -- works for exports to local directories (see HIVE-5672), we have to
+    -- prepare the lines by hand through concatenation :-(
+    -- Set 0 as volume column since we don't use it.
+    SELECT
+        CONCAT_WS('\t', uri_host, cast(uniques_estimate AS string)) line
+    FROM (
+        SELECT
+            uri_host,
+            SUM(uniques_estimate) as uniques_estimate
+        FROM ${source_table}
+        WHERE year=${year}
+            AND month=${month}
+        GROUP BY
+            uri_host
+        HAVING
+            SUM(uniques_estimate) >= 1000
+        ORDER BY
+            uniques_estimate DESC
+        LIMIT 100000000
+    ) uniques_transformed
+;
diff --git a/oozie/last_access_uniques/monthly/workflow.xml 
b/oozie/last_access_uniques/monthly/workflow.xml
index af141cc..a01a155 100644
--- a/oozie/last_access_uniques/monthly/workflow.xml
+++ b/oozie/last_access_uniques/monthly/workflow.xml
@@ -21,13 +21,13 @@
             <description>hive-site.xml file path in HDFS</description>
         </property>
         <!-- specifying parameter values in file to test running -->
-        <property>
-            <name>source_table</name>
+         <property>
+            <name>webrequest_table</name>
             <description>Hive table to read data from.</description>
         </property>
         <property>
-            <name>destination_table</name>
-            <description>The destinaton table to store uniques data 
in.</description>
+            <name>last_access_uniques_monthly_table</name>
+            <description>The table to store uniques data in.</description>
         </property>
         <property>
             <name>year</name>
@@ -38,8 +38,16 @@
             <description>The partition's month</description>
         </property>
         <property>
+            <name>uniques_dataset_directory</name>
+            <description>Directory where the hive data is stored</description>
+        </property>
+        <property>
             <name>mark_directory_done_workflow_file</name>
             <description>Workflow for marking a directory done</description>
+        </property>
+        <property>
+            <name>archive_job_output_workflow_file</name>
+            <description>Workflow for archiving result files</description>
         </property>
         <property>
             <name>send_error_email_workflow_file</name>
@@ -79,8 +87,8 @@
                 </property>
             </configuration>
             <script>last_access_uniques_monthly.hql</script>
-            <param>source_table=${source_table}</param>
-            <param>destination_table=${destination_table}</param>
+            <param>source_table=${webrequest_table}</param>
+            
<param>destination_table=${last_access_uniques_monthly_table}</param>
             <param>year=${year}</param>
             <param>month=${month}</param>
         </hive>
@@ -94,7 +102,92 @@
             <configuration>
                 <property>
                     <name>directory</name>
-                    <value>${destination_dataset_directory}</value>
+                    <value>${uniques_dataset_directory}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="transform_to_archive"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="transform_to_archive">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <job-xml>${hive_site_xml}</job-xml>
+            <configuration>
+                <property>
+                    <name>mapreduce.job.queuename</name>
+                    <value>${queue_name}</value>
+                </property>
+                <!--make sure oozie:launcher runs in a low priority queue -->
+                <property>
+                    <name>oozie.launcher.mapred.job.queue.name</name>
+                    <value>${oozie_launcher_queue_name}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
+                    <value>${oozie_launcher_memory}</value>
+                </property>
+                <!--Let hive decide on the number of reducers -->
+                <property>
+                    <name>mapred.reduce.tasks</name>
+                    <value>-1</value>
+                </property>
+                <property>
+                    <name>hive.exec.scratchdir</name>
+                    <value>/tmp/hive-${user}</value>
+                </property>
+            </configuration>
+
+            <script>last_access_uniques_monthly_to_archive.hql</script>
+            <!-- Here, the source for archive is the
+                 destination of the previous job -->
+            <param>source_table=${last_access_uniques_monthly_table}</param>
+            <param>year=${year}</param>
+            <param>month=${month}</param>
+            
<param>destination_directory=${temporary_directory}/${wf:id()}</param>
+        </hive>
+
+        <ok to="mark_transformed_uniques_dataset_done"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="mark_transformed_uniques_dataset_done">
+        <sub-workflow>
+            <app-path>${mark_directory_done_workflow_file}</app-path>
+            <configuration>
+                <property>
+                    <name>directory</name>
+                    <value>${temporary_directory}/${wf:id()}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="move_data_to_archive"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="move_data_to_archive">
+        <sub-workflow>
+            <app-path>${archive_job_output_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>source_directory</name>
+                    <value>${temporary_directory}/${wf:id()}</value>
+                </property>
+                <property>
+                    <name>expected_filename_ending</name>
+                    <value>.gz</value>
+                </property>
+                <property>
+                    <name>archive_file</name>
+                    <!--
+                    webstatscollector used the end of the collection period as
+                    timestamp in the filename. To not break scripts of people,
+                    we also name files that way.
+                    -->
+                    
<value>${uniques_archive_directory}/${year}/${year}-${month}/unique_devices_monthly-${year}-${month}.gz</value>
                 </property>
             </configuration>
         </sub-workflow>

-- 
To view, visit https://gerrit.wikimedia.org/r/274187
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2b05e289ef2593d7a0edbb51b6b6b2fd41ba8d01
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to