Joal has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/347611 )

Change subject: [WIP] Add oozie jobs loading uniques in druid
......................................................................

[WIP] Add oozie jobs loading uniques in druid

The jobs are in oozie files are in oozie/last_access_uniques/druid
folder, and provide both daily and monthly jobs.

Bug: T159471
Change-Id: I043f841a7f62cf1035aa09e84934bfe6aabe95e2
---
A oozie/last_access_uniques/druid/README.md
A oozie/last_access_uniques/druid/daily/coordinator.properties
A oozie/last_access_uniques/druid/daily/coordinator.xml
A oozie/last_access_uniques/druid/daily/generate_daily_druid_uniques.hql
A oozie/last_access_uniques/druid/daily/workflow.xml
A oozie/last_access_uniques/druid/monthly/coordinator.properties
A oozie/last_access_uniques/druid/monthly/coordinator.xml
A oozie/last_access_uniques/druid/monthly/generate_monthly_druid_uniques.hql
A oozie/last_access_uniques/druid/monthly/load_uniques_monthly.json.template
A oozie/last_access_uniques/druid/monthly/workflow.xml
10 files changed, 872 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery 
refs/changes/11/347611/1

diff --git a/oozie/last_access_uniques/druid/README.md 
b/oozie/last_access_uniques/druid/README.md
new file mode 100644
index 0000000..71a972a
--- /dev/null
+++ b/oozie/last_access_uniques/druid/README.md
@@ -0,0 +1,14 @@
+Oozie jobs to schedule importing last access uniques in druid.
+Daily and monthly folders contain coordinator to load daily
+and monthly uniques in druid.
+
+The workflow launches a hive action generating json data,
+then launches the druid indexation on these files.
+The script waits for indexation to finish then delete the json files.
+
+Example command for running the coordinator on command line:
+
+    oozie job -run \
+         -config daily/coordinator.properties \
+         -D refinery_directory=hdfs://analytics-hadoop/wmf/refinery/current \
+         -D 
spark_job_jar=hdfs://analytics-hadoop/wmf/refinery/current/artifacts/refinery-job.jar
diff --git a/oozie/last_access_uniques/druid/daily/coordinator.properties 
b/oozie/last_access_uniques/druid/daily/coordinator.properties
new file mode 100644
index 0000000..bc0e92f
--- /dev/null
+++ b/oozie/last_access_uniques/druid/daily/coordinator.properties
@@ -0,0 +1,64 @@
+# Configures a coordinator to generate a day of JSON daily uniques and load 
them in druid.
+# Any of the following properties are override-able with -D.
+# Usage:
+#   oozie job -Dstart_time=2016-06-01T00:00Z -submit -config 
oozie/last_access_uniques/druid/daily/coordinator.properties
+#
+# NOTE:  The $refinery_directory must be synced to HDFS so that all relevant
+#        .xml files exist there when this job is submitted.
+
+name_node                           = hdfs://analytics-hadoop
+job_tracker                         = 
resourcemanager.analytics.eqiad.wmnet:8032
+queue_name                          = default
+
+#Default user
+user                                = hdfs
+
+# Base path in HDFS to refinery.
+# When submitting this job for production, you should override this to point 
directly at a deployed
+# directory name, and not the 'symbolic' 'current' directory. E.g. 
/wmf/refinery/2015-01-05T17.59.18Z--7bb7f07
+refinery_directory                  = ${name_node}/wmf/refinery/current
+
+# Base path in HDFS to oozie files.
+# Other files will be used relative to this path.
+oozie_directory                     = ${refinery_directory}/oozie
+
+# HDFS path to the hive-site.xml file to use
+hive_site_xml                       = ${name_node}/user/hive/hive-site.xml
+
+# HDFS path to coordinator to run.
+coordinator_file                    = 
${oozie_directory}/last_access_uniques/druid/daily/coordinator.xml
+# HDFS path to workflow to run.
+workflow_file                       = 
${oozie_directory}/last_access_uniques/druid/daily/workflow.xml
+
+# HDFS path to last_access_uniques dataset definitions
+last_access_uniques_datasets_file   = 
${oozie_directory}/last_access_uniques/datasets.xml
+last_access_uniques_data_directory  = 
${name_node}/wmf/data/wmf/last_access_uniques
+
+# last_access_uniques_daily table name
+last_access_uniques_daily_table     = wmf.last_access_uniques_daily
+
+# Initial import time of the last_access_uniques_daily dataset.
+start_time                          = 2015-12-17T00:00Z
+
+# Time to stop running this coordinator.  Year 3000 == never!
+stop_time                           = 3000-01-01T00:00Z
+
+# Temporary directory
+temporary_directory                 = ${name_node}/tmp
+
+# HDFS path to template to use.
+druid_template_file                 = 
${oozie_directory}/last_access_uniques/druid/daily/load_uniques_daily.json.template
+# Druid overlord url
+druid_overlord_url                  = http://druid1001.eqiad.wmnet:8090
+
+# HDFS path to workflow to load druid
+load_druid_workflow_file            = 
${oozie_directory}/util/druid/load/workflow.xml
+# HDFS path to workflow to mark a directory as done
+mark_directory_done_workflow_file   = 
${oozie_directory}/util/mark_directory_done/workflow.xml
+# Workflow to send an error email
+send_error_email_workflow_file      = 
${oozie_directory}/util/send_error_email/workflow.xml
+
+# Coordinator to start.
+oozie.coord.application.path        = ${coordinator_file}
+oozie.use.system.libpath            = true
+oozie.action.external.stats.write   = true
diff --git a/oozie/last_access_uniques/druid/daily/coordinator.xml 
b/oozie/last_access_uniques/druid/daily/coordinator.xml
new file mode 100644
index 0000000..f98a725
--- /dev/null
+++ b/oozie/last_access_uniques/druid/daily/coordinator.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<coordinator-app xmlns="uri:oozie:coordinator:0.4"
+    name="last_access_uniques-druid-daily-coord"
+    frequency="${coord:days(1)}"
+    start="${start_time}"
+    end="${stop_time}"
+    timezone="Universal">
+
+    <parameters>
+
+        <!-- Required properties. -->
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+        <property><name>queue_name</name></property>
+
+        <property><name>hive_site_xml</name></property>
+        <property><name>workflow_file</name></property>
+        <property><name>last_access_uniques_datasets_file</name></property>
+        <property><name>last_access_uniques_data_directory</name></property>
+        <property><name>last_access_uniques_daily_table</name></property>
+
+        <property><name>start_time</name></property>
+        <property><name>stop_time</name></property>
+
+        <property><name>druid_template_file</name></property>
+        <property><name>druid_overlord_url</name></property>
+
+        <property><name>temporary_directory</name></property>
+
+        <property><name>load_druid_workflow_file</name></property>
+        <property><name>mark_directory_done_workflow_file</name></property>
+        <property><name>send_error_email_workflow_file</name></property>
+
+    </parameters>
+
+    <controls>
+        <!--(timeout is measured in minutes)-->
+        <timeout>-1</timeout>
+
+        <!-- Setting low concurrency cause the job is hungry in resources -->
+        <concurrency>1</concurrency>
+
+        <throttle>2</throttle>
+
+    </controls>
+
+    <datasets>
+        <include>${last_access_uniques_datasets_file}</include>
+    </datasets>
+
+    <input-events>
+        <data-in name="uniques_daily_input" 
dataset="last_access_uniques_daily">
+            <!-- Dataset uses day as frequency -->
+            <start-instance>${coord:current(0)}</start-instance>
+            <end-instance>${coord:current(1)}</end-instance>
+        </data-in>
+    </input-events>
+
+    <action>
+        <workflow>
+            <app-path>${workflow_file}</app-path>
+            <configuration>
+                <property>
+                    <name>year</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"y")}</value>
+                </property>
+                <property>
+                    <name>month</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"M")}</value>
+                </property>
+                <property>
+                    <name>day</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"d")}</value>
+                </property>
+                <property>
+                    <name>loaded_period</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"yyyy-MM-dd")}/${coord:formatTime(coord:dateOffset(coord:nominalTime(), 1, 
"DAY"), "yyyy-MM-dd")}</value>
+                </property>
+            </configuration>
+        </workflow>
+    </action>
+</coordinator-app>
\ No newline at end of file
diff --git 
a/oozie/last_access_uniques/druid/daily/generate_daily_druid_uniques.hql 
b/oozie/last_access_uniques/druid/daily/generate_daily_druid_uniques.hql
new file mode 100644
index 0000000..1d670d4
--- /dev/null
+++ b/oozie/last_access_uniques/druid/daily/generate_daily_druid_uniques.hql
@@ -0,0 +1,54 @@
+-- Extracts one day of json formatted daily uniques to be loaded in Druid
+--
+-- Usage:
+--     hive -f generate_daily_druid_uniques.hql \
+--         -d source_table=wmf.last_access_uniques_daily \
+--         -d destination_directory=/tmp/druid/daily_json_uniques \
+--         -d year=2016 \
+--         -d month=7 \
+--         -d day=10
+--
+
+SET hive.exec.compress.output=true;
+SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+ADD JAR /usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar;
+
+
+DROP TABLE IF EXISTS tmp_daily_druid_uniques_${year}_${month}_${day};
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS 
tmp_daily_druid_uniques_${year}_${month}_${day} (
+    `dt`                     string,
+    `host`                   string,
+    `country`                string,
+    `country_code`           string,
+    `uniques_underestimate`  bigint,
+    `uniques_offset`         bigint,
+    `uniques_estimate`       bigint
+)
+ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
+STORED AS TEXTFILE
+LOCATION '${destination_directory}';
+
+
+INSERT OVERWRITE TABLE tmp_daily_druid_uniques_${year}_${month}_${day}
+SELECT
+    CONCAT(
+        LPAD(year, 4, '0'), '-',
+        LPAD(month, 2, '0'), '-',
+        LPAD(day, 2, '0'), 'T00:00:00Z') AS dt,
+    uri_host AS host,
+    country AS country,
+    country_code AS country_code,
+    uniques_underestimate as uniques_underestimate,
+    uniques_offset AS uniques_offset,
+    uniques_estimate AS uniques_estimate
+FROM ${source_table}
+WHERE year = ${year}
+    AND month = ${month}
+    AND day = ${day};
+
+
+DROP TABLE IF EXISTS tmp_daily_druid_uniques_${year}_${month}_${day};
diff --git a/oozie/last_access_uniques/druid/daily/workflow.xml 
b/oozie/last_access_uniques/druid/daily/workflow.xml
new file mode 100644
index 0000000..df0fe3a
--- /dev/null
+++ b/oozie/last_access_uniques/druid/daily/workflow.xml
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+    name="last_access_uniques-druid-daily-wf-${year}-${month}-${day}">
+
+    <parameters>
+
+        <!-- Default values for inner oozie settings -->
+        <property>
+            <name>oozie_launcher_queue_name</name>
+            <value>${queue_name}</value>
+        </property>
+        <property>
+            <name>oozie_launcher_memory</name>
+            <value>2048</value>
+        </property>
+
+        <!-- Required properties -->
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+        <property><name>queue_name</name></property>
+
+        <property>
+            <name>hive_site_xml</name>
+            <description>hive-site.xml file path in HDFS</description>
+        </property>
+
+        <property>
+            <name>last_access_uniques_daily_table</name>
+            <description>The hive last access uniques daily table to 
use</description>
+        </property>
+
+        <property>
+            <name>year</name>
+            <description>The partition's year</description>
+        </property>
+        <property>
+            <name>month</name>
+            <description>The partition's month</description>
+        </property>
+        <property>
+            <name>day</name>
+            <description>The partition's day</description>
+        </property>
+
+        <property>
+            <name>loaded_period</name>
+            <description>Period of the data loaded in interval format 
(yyyy-MM-dd/yyy-MM-dd)</description>
+        </property>
+        <property>
+            <name>druid_template_file</name>
+            <description>File to use as a template to define druid loading 
(absolute since used by load_druid sub-workflow)</description>
+        </property>
+        <property>
+            <name>druid_overlord_url</name>
+            <description>The druid overlord url used for loading</description>
+        </property>
+        <property>
+            <name>temporary_directory</name>
+            <description>A directory in HDFS for temporary files</description>
+        </property>
+        <property>
+            <name>load_druid_workflow_file</name>
+            <description>Workflow for loading druid</description>
+        </property>
+        <property>
+            <name>mark_directory_done_workflow_file</name>
+            <description>Workflow for marking a directory done</description>
+        </property>
+        <property>
+            <name>send_error_email_workflow_file</name>
+            <description>Workflow for sending an email</description>
+        </property>
+
+    </parameters>
+
+    <start to="generate_json_uniques_daily"/>
+
+    <action name="generate_json_uniques_daily">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <job-xml>${hive_site_xml}</job-xml>
+            <configuration>
+                <!--make sure oozie:launcher runs in a low priority queue -->
+                <property>
+                    <name>oozie.launcher.mapred.job.queue.name</name>
+                    <value>${oozie_launcher_queue_name}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
+                    <value>${oozie_launcher_memory}</value>
+                </property>
+
+                <property>
+                    <name>mapreduce.job.queuename</name>
+                    <value>${queue_name}</value>
+                </property>
+                <!--Let hive decide on the number of reducers -->
+                <property>
+                    <name>mapred.reduce.tasks</name>
+                    <value>-1</value>
+                </property>
+                <property>
+                    <name>hive.exec.scratchdir</name>
+                    <value>/tmp/hive-${user}</value>
+                </property>
+            </configuration>
+            <script>generate_daily_druid_uniques.hql</script>
+            <param>source_table=${last_access_uniques_daily_table}</param>
+            
<param>destination_directory=${temporary_directory}/${wf:id()}-daily-druid-uniques-${year}-${month}-${day}</param>
+            <param>year=${year}</param>
+            <param>month=${month}</param>
+            <param>day=${day}</param>
+        </hive>
+        <ok to="mark_json_uniques_daily_dataset_done" />
+        <error to="send_error_email" />
+    </action>
+
+    <action name="mark_json_uniques_daily_dataset_done">
+        <sub-workflow>
+            <app-path>${mark_directory_done_workflow_file}</app-path>
+            <configuration>
+                <property>
+                    <name>directory</name>
+                    
<value>${temporary_directory}/${wf:id()}-daily-druid-uniques-${year}-${month}-${day}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="index_druid"/>
+        <error to="send_error_email"/>
+    </action>
+
+
+    <action name="index_druid">
+        <sub-workflow>
+            <app-path>${load_druid_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>source_directory</name>
+                    
<value>${temporary_directory}/${wf:id()}-daily-druid-uniques-${year}-${month}-${day}</value>
+                </property>
+                <property>
+                    <name>template_file</name>
+                    <value>${druid_template_file}</value>
+                </property>
+                <property>
+                    <name>loaded_period</name>
+                    <value>${loaded_period}</value>
+                </property>
+                <property>
+                    <name>druid_overlord_url</name>
+                    <value>${druid_overlord_url}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="remove_temporary_data"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="remove_temporary_data">
+        <fs>
+            <delete 
path="${temporary_directory}/${wf:id()}-daily-druid-uniques-${year}-${month}-${day}"/>
+        </fs>
+        <ok to="end"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="send_error_email">
+        <sub-workflow>
+            <app-path>${send_error_email_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>parent_name</name>
+                    <value>${wf:name()}</value>
+                </property>
+                <property>
+                    <name>parent_failed_action</name>
+                    <value>${wf:lastErrorNode()}</value>
+                </property>
+                <property>
+                    <name>parent_error_code</name>
+                    <value>${wf:errorCode(wf:lastErrorNode())}</value>
+                </property>
+                <property>
+                    <name>parent_error_message</name>
+                    <value>${wf:errorMessage(wf:lastErrorNode())}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="kill"/>
+        <error to="kill"/>
+    </action>
+
+    <kill name="kill">
+        <message>Action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>
diff --git a/oozie/last_access_uniques/druid/monthly/coordinator.properties 
b/oozie/last_access_uniques/druid/monthly/coordinator.properties
new file mode 100644
index 0000000..fc85bb8
--- /dev/null
+++ b/oozie/last_access_uniques/druid/monthly/coordinator.properties
@@ -0,0 +1,64 @@
+# Configures a coordinator to generate a month of JSON monthly uniques and 
load them in druid.
+# Any of the following properties are override-able with -D.
+# Usage:
+#   oozie job -Dstart_time=2016-06-01T00:00Z -submit -config 
oozie/last_access_uniques/druid/monthly/coordinator.properties
+#
+# NOTE:  The $refinery_directory must be synced to HDFS so that all relevant
+#        .xml files exist there when this job is submitted.
+
+name_node                           = hdfs://analytics-hadoop
+job_tracker                         = 
resourcemanager.analytics.eqiad.wmnet:8032
+queue_name                          = default
+
+#Default user
+user                                = hdfs
+
+# Base path in HDFS to refinery.
+# When submitting this job for production, you should override this to point 
directly at a deployed
+# directory name, and not the 'symbolic' 'current' directory. E.g. 
/wmf/refinery/2015-01-05T17.59.18Z--7bb7f07
+refinery_directory                  = ${name_node}/wmf/refinery/current
+
+# Base path in HDFS to oozie files.
+# Other files will be used relative to this path.
+oozie_directory                     = ${refinery_directory}/oozie
+
+# HDFS path to the hive-site.xml file to use
+hive_site_xml                       = ${name_node}/user/hive/hive-site.xml
+
+# HDFS path to coordinator to run.
+coordinator_file                    = 
${oozie_directory}/last_access_uniques/druid/monthly/coordinator.xml
+# HDFS path to workflow to run.
+workflow_file                       = 
${oozie_directory}/last_access_uniques/druid/monthly/workflow.xml
+
+# HDFS path to last_access_uniques dataset definitions
+last_access_uniques_datasets_file   = 
${oozie_directory}/last_access_uniques/datasets.xml
+last_access_uniques_data_directory  = 
${name_node}/wmf/data/wmf/last_access_uniques
+
+# last_access_uniques_monthly table name
+last_access_uniques_monthly_table   = wmf.last_access_uniques_monthly
+
+# Initial import time of the last_access_uniques_monthly dataset.
+start_time                          = 2016-01-01T00:00Z
+
+# Time to stop running this coordinator.  Year 3000 == never!
+stop_time                           = 3000-01-01T00:00Z
+
+# Temporary directory
+temporary_directory                 = ${name_node}/tmp
+
+# HDFS path to template to use.
+druid_template_file                 = 
${oozie_directory}/last_access_uniques/druid/monthly/load_uniques_monthly.json.template
+# Druid overlord url
+druid_overlord_url                  = http://druid1001.eqiad.wmnet:8090
+
+# HDFS path to workflow to load druid
+load_druid_workflow_file            = 
${oozie_directory}/util/druid/load/workflow.xml
+# HDFS path to workflow to mark a directory as done
+mark_directory_done_workflow_file   = 
${oozie_directory}/util/mark_directory_done/workflow.xml
+# Workflow to send an error email
+send_error_email_workflow_file      = 
${oozie_directory}/util/send_error_email/workflow.xml
+
+# Coordinator to start.
+oozie.coord.application.path        = ${coordinator_file}
+oozie.use.system.libpath            = true
+oozie.action.external.stats.write   = true
diff --git a/oozie/last_access_uniques/druid/monthly/coordinator.xml 
b/oozie/last_access_uniques/druid/monthly/coordinator.xml
new file mode 100644
index 0000000..2172bd9
--- /dev/null
+++ b/oozie/last_access_uniques/druid/monthly/coordinator.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<coordinator-app xmlns="uri:oozie:coordinator:0.4"
+    name="last_access_uniques-druid-monthly-coord"
+    frequency="${coord:months(1)}"
+    start="${start_time}"
+    end="${stop_time}"
+    timezone="Universal">
+
+    <parameters>
+
+        <!-- Required properties. -->
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+        <property><name>queue_name</name></property>
+
+        <property><name>hive_site_xml</name></property>
+        <property><name>workflow_file</name></property>
+        <property><name>last_access_uniques_datasets_file</name></property>
+        <property><name>last_access_uniques_data_directory</name></property>
+        <property><name>last_access_uniques_monthly_table</name></property>
+
+        <property><name>start_time</name></property>
+        <property><name>stop_time</name></property>
+
+        <property><name>druid_template_file</name></property>
+        <property><name>druid_overlord_url</name></property>
+
+        <property><name>temporary_directory</name></property>
+
+        <property><name>load_druid_workflow_file</name></property>
+        <property><name>mark_directory_done_workflow_file</name></property>
+        <property><name>send_error_email_workflow_file</name></property>
+
+    </parameters>
+
+    <controls>
+        <!--(timeout is measured in minutes)-->
+        <timeout>-1</timeout>
+
+        <!-- Setting low concurrency cause the job is hungry in resources -->
+        <concurrency>1</concurrency>
+
+        <throttle>2</throttle>
+
+    </controls>
+
+    <datasets>
+        <include>${last_access_uniques_datasets_file}</include>
+    </datasets>
+
+    <input-events>
+        <data-in name="uniques_monthly_input" 
dataset="last_access_uniques_monthly">
+            <!-- Dataset uses month as frequency -->
+            <start-instance>${coord:current(0)}</start-instance>
+            <end-instance>${coord:current(1)}</end-instance>
+        </data-in>
+    </input-events>
+
+    <action>
+        <workflow>
+            <app-path>${workflow_file}</app-path>
+            <configuration>
+                <property>
+                    <name>year</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"y")}</value>
+                </property>
+                <property>
+                    <name>month</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"M")}</value>
+                </property>
+                <property>
+                    <name>loaded_period</name>
+                    <value>${coord:formatTime(coord:nominalTime(), 
"yyyy-MM-dd")}/${coord:formatTime(coord:dateOffset(coord:nominalTime(), 1, 
"MONTH"), "yyyy-MM-dd")}</value>
+                </property>
+            </configuration>
+        </workflow>
+    </action>
+</coordinator-app>
\ No newline at end of file
diff --git 
a/oozie/last_access_uniques/druid/monthly/generate_monthly_druid_uniques.hql 
b/oozie/last_access_uniques/druid/monthly/generate_monthly_druid_uniques.hql
new file mode 100644
index 0000000..3912d68
--- /dev/null
+++ b/oozie/last_access_uniques/druid/monthly/generate_monthly_druid_uniques.hql
@@ -0,0 +1,51 @@
+-- Extracts one month of json formatted monthly uniques to be loaded in Druid
+--
+-- Usage:
+--     hive -f generate_monthly_druid_uniques.hql \
+--         -d source_table=wmf.last_access_uniques_monthly \
+--         -d destination_directory=/tmp/druid/monthly_json_uniques \
+--         -d year=2016 \
+--         -d month=7
+--
+
+SET hive.exec.compress.output=true;
+SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+ADD JAR /usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar;
+
+
+DROP TABLE IF EXISTS tmp_monthly_druid_uniques_${year}_${month};
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS tmp_monthly_druid_uniques_${year}_${month} 
(
+    `dt`                     string,
+    `host`                   string,
+    `country`                string,
+    `country_code`           string,
+    `uniques_underestimate`  bigint,
+    `uniques_offset`         bigint,
+    `uniques_estimate`       bigint
+)
+ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
+STORED AS TEXTFILE
+LOCATION '${destination_directory}';
+
+
+INSERT OVERWRITE TABLE tmp_monthly_druid_uniques_${year}_${month}
+SELECT
+    CONCAT(
+        LPAD(year, 4, '0'), '-',
+        LPAD(month, 2, '0'), '-01T00:00:00Z') AS dt,
+    uri_host AS host,
+    country AS country,
+    country_code AS country_code,
+    uniques_underestimate as uniques_underestimate,
+    uniques_offset AS uniques_offset,
+    uniques_estimate AS uniques_estimate
+FROM ${source_table}
+WHERE year = ${year}
+    AND month = ${month};
+
+
+DROP TABLE IF EXISTS tmp_monthly_druid_uniques_${year}_${month};
diff --git 
a/oozie/last_access_uniques/druid/monthly/load_uniques_monthly.json.template 
b/oozie/last_access_uniques/druid/monthly/load_uniques_monthly.json.template
new file mode 100644
index 0000000..d057451
--- /dev/null
+++ b/oozie/last_access_uniques/druid/monthly/load_uniques_monthly.json.template
@@ -0,0 +1,70 @@
+{
+  "type" : "index_hadoop",
+  "spec" : {
+    "ioConfig" : {
+      "type" : "hadoop",
+      "inputSpec" : {
+        "type" : "static",
+        "paths" : "*INPUT_PATH*"
+      }
+    },
+    "dataSchema" : {
+      "dataSource" : "unique-devices-monthly",
+      "granularitySpec" : {
+        "type" : "uniform",
+        "segmentGranularity" : "month",
+        "queryGranularity" : "month",
+        "intervals" : *INTERVALS_ARRAY*
+      },
+      "parser" : {
+        "type" : "string",
+        "parseSpec" : {
+          "format" : "json",
+          "dimensionsSpec" : {
+            "dimensions" : [
+                "host",
+                "country",
+                "country_code"
+            ]
+          },
+          "timestampSpec" : {
+            "format" : "auto",
+            "column" : "dt"
+          }
+        }
+      },
+      "metricsSpec" : [
+        {
+          "name" : "uniques_underestimate",
+          "type" : "longSum",
+          "fieldName": "uniques_underestimate"
+        },
+        {
+          "name" : "uniques_offset",
+          "type" : "longSum",
+          "fieldName": "uniques_offset"
+        },
+        {
+          "name" : "uniques_estimate",
+          "type" : "longSum",
+          "fieldName": "uniques_estimate"
+        }
+      ]
+    },
+    "tuningConfig" : {
+      "type" : "hadoop",
+      "overwriteFiles": true,
+      "ignoreInvalidRows" : false,
+      "partitionsSpec" : {
+        "type" : "hashed",
+        "numShards" : 1
+      },
+      "jobProperties" : {
+        "mapreduce.reduce.memory.mb" : "8192",
+        "mapreduce.output.fileoutputformat.compress": 
"org.apache.hadoop.io.compress.GzipCodec",
+        "mapreduce.job.queuename": "*HADOOP_QUEUE*"
+      }
+    }
+  }
+}
+
diff --git a/oozie/last_access_uniques/druid/monthly/workflow.xml 
b/oozie/last_access_uniques/druid/monthly/workflow.xml
new file mode 100644
index 0000000..4c098e4
--- /dev/null
+++ b/oozie/last_access_uniques/druid/monthly/workflow.xml
@@ -0,0 +1,195 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+    name="last_access_uniques-druid-monthly-wf-${year}-${month}">
+
+    <parameters>
+
+        <!-- Default values for inner oozie settings -->
+        <property>
+            <name>oozie_launcher_queue_name</name>
+            <value>${queue_name}</value>
+        </property>
+        <property>
+            <name>oozie_launcher_memory</name>
+            <value>2048</value>
+        </property>
+
+        <!-- Required properties -->
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+        <property><name>queue_name</name></property>
+
+        <property>
+            <name>hive_site_xml</name>
+            <description>hive-site.xml file path in HDFS</description>
+        </property>
+
+        <property>
+            <name>last_access_uniques_monthly_table</name>
+            <description>The hive pageview table to use</description>
+        </property>
+
+        <property>
+            <name>year</name>
+            <description>The partition's year</description>
+        </property>
+        <property>
+            <name>month</name>
+            <description>The partition's month</description>
+        </property>
+
+        <property>
+            <name>loaded_period</name>
+            <description>Period of the data loaded in interval format 
(yyyy-MM-dd/yyy-MM-dd)</description>
+        </property>
+        <property>
+            <name>druid_template_file</name>
+            <description>File to use as a template to define druid loading 
(absolute since used by load_druid sub-workflow)</description>
+        </property>
+        <property>
+            <name>druid_overlord_url</name>
+            <description>The druid overlord url used for loading</description>
+        </property>
+        <property>
+            <name>temporary_directory</name>
+            <description>A directory in HDFS for temporary files</description>
+        </property>
+        <property>
+            <name>load_druid_workflow_file</name>
+            <description>Workflow for loading druid</description>
+        </property>
+        <property>
+            <name>mark_directory_done_workflow_file</name>
+            <description>Workflow for marking a directory done</description>
+        </property>
+        <property>
+            <name>send_error_email_workflow_file</name>
+            <description>Workflow for sending an email</description>
+        </property>
+
+    </parameters>
+
+    <start to="generate_json_uniques_monthly"/>
+
+    <action name="generate_json_uniques_monthly">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <job-xml>${hive_site_xml}</job-xml>
+            <configuration>
+                <!--make sure oozie:launcher runs in a low priority queue -->
+                <property>
+                    <name>oozie.launcher.mapred.job.queue.name</name>
+                    <value>${oozie_launcher_queue_name}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
+                    <value>${oozie_launcher_memory}</value>
+                </property>
+
+                <property>
+                    <name>mapreduce.job.queuename</name>
+                    <value>${queue_name}</value>
+                </property>
+                <!--Let hive decide on the number of reducers -->
+                <property>
+                    <name>mapred.reduce.tasks</name>
+                    <value>-1</value>
+                </property>
+                <property>
+                    <name>hive.exec.scratchdir</name>
+                    <value>/tmp/hive-${user}</value>
+                </property>
+            </configuration>
+            <script>generate_monthly_druid_uniques.hql</script>
+            <param>source_table=${last_access_uniques_monthly_table}</param>
+            
<param>destination_directory=${temporary_directory}/${wf:id()}-monthly-druid-uniques-${year}-${month}</param>
+            <param>year=${year}</param>
+            <param>month=${month}</param>
+        </hive>
+        <ok to="mark_json_uniques_monthly_dataset_done" />
+        <error to="send_error_email" />
+    </action>
+
+    <action name="mark_json_uniques_monthly_dataset_done">
+        <sub-workflow>
+            <app-path>${mark_directory_done_workflow_file}</app-path>
+            <configuration>
+                <property>
+                    <name>directory</name>
+                    
<value>${temporary_directory}/${wf:id()}-monthly-druid-uniques-${year}-${month}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="index_druid"/>
+        <error to="send_error_email"/>
+    </action>
+
+
+    <action name="index_druid">
+        <sub-workflow>
+            <app-path>${load_druid_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>source_directory</name>
+                    
<value>${temporary_directory}/${wf:id()}-monthly-druid-uniques-${year}-${month}</value>
+                </property>
+                <property>
+                    <name>template_file</name>
+                    <value>${druid_template_file}</value>
+                </property>
+                <property>
+                    <name>loaded_period</name>
+                    <value>${loaded_period}</value>
+                </property>
+                <property>
+                    <name>druid_overlord_url</name>
+                    <value>${druid_overlord_url}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="remove_temporary_data"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="remove_temporary_data">
+        <fs>
+            <delete 
path="${temporary_directory}/${wf:id()}-monthly-druid-uniques-${year}-${month}"/>
+        </fs>
+        <ok to="end"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="send_error_email">
+        <sub-workflow>
+            <app-path>${send_error_email_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>parent_name</name>
+                    <value>${wf:name()}</value>
+                </property>
+                <property>
+                    <name>parent_failed_action</name>
+                    <value>${wf:lastErrorNode()}</value>
+                </property>
+                <property>
+                    <name>parent_error_code</name>
+                    <value>${wf:errorCode(wf:lastErrorNode())}</value>
+                </property>
+                <property>
+                    <name>parent_error_message</name>
+                    <value>${wf:errorMessage(wf:lastErrorNode())}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="kill"/>
+        <error to="kill"/>
+    </action>
+
+    <kill name="kill">
+        <message>Action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

-- 
To view, visit https://gerrit.wikimedia.org/r/347611
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I043f841a7f62cf1035aa09e84934bfe6aabe95e2
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to