Mforns has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/339421 )

Change subject: Add oozie workflow to load projectcounts to AQS
......................................................................

Add oozie workflow to load projectcounts to AQS

Bug: T156388
Change-Id: I734b54d3bd95d373b76163d3caac7c90a07e143e
---
A oozie/cassandra/historical/projectcounts_per_project.hql
A oozie/cassandra/historical/projectcounts_per_project.properties
A oozie/cassandra/historical/workflow.xml
3 files changed, 577 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery 
refs/changes/21/339421/1

diff --git a/oozie/cassandra/historical/projectcounts_per_project.hql 
b/oozie/cassandra/historical/projectcounts_per_project.hql
new file mode 100644
index 0000000..e57f9f5
--- /dev/null
+++ b/oozie/cassandra/historical/projectcounts_per_project.hql
@@ -0,0 +1,69 @@
+-- Parameters:
+--     destination_directory -- HDFS path to write output files
+--     source_table_1        -- Fully qualified projectcounts table
+--     source_table_2        -- Fully qualified abbreviation map table
+--     separator             -- Separator for values
+--
+-- Usage:
+--     hive -f projectcounts_per_project.hql                          \
+--         -d destination_directory=/tmp/projectcounts_per_project    \
+--         -d source_table_1=wmf.projectcounts_raw                    \
+--         -d source_table_2=wmf.abbreviation_domain_map              \
+--         -d separator=\t
+--
+
+
+SET hive.exec.compress.output=true;
+SET 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+INSERT OVERWRITE DIRECTORY "${destination_directory}"
+    -- Since "ROW FORMAT DELIMITED DELIMITED FIELDS TERMINATED BY ' '" only
+    -- works for exports to local directories (see HIVE-5672), we have to
+    -- prepare the lines by hand through concatenation :-(
+    SELECT
+        CONCAT_WS('${separator}',
+        COALESCE(hostname, 'all-projects'),
+        COALESCE(IF(access_site IN ('mobile', 'zero'), 'mobile-web', 
access_site), 'all-access'),
+        CONCAT(LPAD(year, 4, "0"), LPAD(month, 2, "0"), LPAD(day, 2, "0"), 
LPAD(hour, 2, "0")),
+        CAST(SUM(view_count) AS STRING))
+    FROM
+        ${source_table_1}
+    INNER JOIN
+        ${source_table_2}
+    ON
+        wiki_code = abbreviation
+    GROUP BY
+        hostname,
+        access_site,
+        year,
+        month,
+        day,
+        hour
+    GROUPING SETS (
+        (
+            hostname,
+            access_site,
+            year,
+            month,
+            day,
+            hour
+        ),(
+            hostname,
+            year,
+            month,
+            day,
+            hour
+        ),(
+            access_site,
+            year,
+            month,
+            day,
+            hour
+        ),(
+            year,
+            month,
+            day,
+            hour
+        )
+    );
diff --git a/oozie/cassandra/historical/projectcounts_per_project.properties 
b/oozie/cassandra/historical/projectcounts_per_project.properties
new file mode 100644
index 0000000..fbd49cb
--- /dev/null
+++ b/oozie/cassandra/historical/projectcounts_per_project.properties
@@ -0,0 +1,78 @@
+# Configures a workflow to manage loading cassandra for the per_project
+# historical pageview API. Any of the following properties are overidable with 
-D.
+# Usage:
+#   oozie job -Duser=$USER -submit -config 
oozie/cassandra/wf_per_project_historical.properties
+#
+# NOTE:  The $oozie_directory must be synced to HDFS so that all relevant
+#        .xml files exist there when this job is submitted.
+
+
+name_node                         = hdfs://analytics-hadoop
+job_tracker                       = resourcemanager.analytics.eqiad.wmnet:8032
+queue_name                        = default
+
+user                              = hdfs
+
+# Base path in HDFS to refinery.
+# When submitting this job for production, you should
+# override this to point directly at a deployed
+# directory name, and not the 'symbolic' 'current' directory.
+# E.g.  /wmf/refinery/2015-01-05T17.59.18Z--7bb7f07
+refinery_directory                = ${name_node}/wmf/refinery/current
+
+# HDFS path to the refinery job jar that will be used by this job.
+refinery_cassandra_jar_path       = 
${refinery_directory}/artifacts/org/wikimedia/analytics/refinery/refinery-cassandra-0.0.35.jar
+cassandra_reducer_class           = 
org.wikimedia.analytics.refinery.cassandra.ReducerToCassandra
+cassandra_output_format_class     = 
org.wikimedia.analytics.refinery.cassandra.CqlOutputFormat
+
+# Base path in HDFS to oozie files.
+# Other files will be used relative to this path.
+oozie_directory                   = ${refinery_directory}/oozie
+
+# HDFS path to workflows to run.
+workflow_file                     = 
${oozie_directory}/cassandra/historical/workflow.xml
+
+# Workflow to send an error email
+send_error_email_workflow_file    = 
${oozie_directory}/util/send_error_email/workflow.xml
+
+# HDFS path to hive-site.xml file.  This is needed to run hive actions.
+hive_site_xml                     = ${name_node}/user/hive/hive-site.xml
+# Temporary directory
+temporary_directory               = ${name_node}/tmp
+
+# Cassandra cluster info
+cassandra_host                    = aqs1004-a.eqiad.wmnet
+cassandra_port                    = 9042
+cassandra_username                = aqsloader
+cassandra_password                = cassandra
+cassandra_nodes                   = 6
+batch_size                        = 1024
+
+# Hive value separator
+hive_value_separator              = \\t
+# Cassandra table to be loaded (not job dependant)
+cassandra_table                   = data
+
+# Constant field names and value to be loaded into cassandra
+constant_output_domain_field      = _domain
+constant_output_domain_value      = analytics.wikimedia.org,text
+constant_output_granularity_field = granularity
+constant_output_tid_field         = _tid
+constant_output_tid_value         = 0,timeuuid
+
+hive_script                       = projectcounts_per_project.hql
+projectcounts_table               = wmf.projectcounts_raw
+abbreviation_table                = wmf.abbreviation_domain_map
+
+cassandra_parallel_loaders        = 1
+cassandra_keyspace                = 
local_group_default_T_pagecounts_per_project
+cassandra_cql                     = UPDATE "${cassandra_keyspace}"."data" SET 
"v" = ?
+hive_fields                       = project,access,timestamp,v
+hive_fields_types                 = text,text,text,bigint
+cassandra_fields                  = v
+cassandra_primary_keys            = 
_domain,project,access,granularity,timestamp,_tid
+
+# Coordintator to start.
+oozie.wf.application.path         = ${workflow_file}
+oozie.use.system.libpath          = true
+oozie.action.external.stats.write = true
diff --git a/oozie/cassandra/historical/workflow.xml 
b/oozie/cassandra/historical/workflow.xml
new file mode 100644
index 0000000..698f639
--- /dev/null
+++ b/oozie/cassandra/historical/workflow.xml
@@ -0,0 +1,430 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+    name="cassandra-historical-wf-${cassandra_keyspace}">
+
+    <parameters>
+
+        <!-- Default values for inner oozie settings -->
+        <property>
+            <name>oozie_launcher_queue_name</name>
+            <value>${queue_name}</value>
+        </property>
+        <property>
+            <name>oozie_launcher_memory</name>
+            <value>256</value>
+        </property>
+
+        <!-- Required properties -->
+        <property><name>queue_name</name></property>
+        <property><name>name_node</name></property>
+        <property><name>job_tracker</name></property>
+
+        <property>
+            <name>hive_script</name>
+            <description>Hive script to run.</description>
+        </property>
+        <property>
+            <name>hive_site_xml</name>
+            <description>hive-site.xml file path in HDFS</description>
+        </property>
+        <property>
+            <name>source_table_1</name>
+            <description>Main hive table to use</description>
+        </property>
+        <property>
+            <name>source_table_2</name>
+            <description>Secondary hive table to use</description>
+        </property>
+        <property>
+            <name>temporary_directory</name>
+            <description>A temporary directory to store data.</description>
+        </property>
+        <property>
+            <name>send_error_email_workflow_file</name>
+            <description>Workflow for sending an email</description>
+        </property>
+
+        <!-- cassandra loader -->
+        <property>
+            <name>refinery_cassandra_jar_path</name>
+            <description>The refinery-cassandra jar file path in 
HDFS</description>
+        </property>
+        <property>
+            <name>cassandra_reducer_class</name>
+            <description>The reducer class to be used in refinery-cassandra 
jar</description>
+        </property>
+        <property>
+            <name>cassandra_output_format_class</name>
+            <description>The output format class to be used in 
refinery-cassandra jar</description>
+        </property>
+        <property>
+            <name>cassandra_parallel_loaders</name>
+            <description>The number of reducers to parallel load 
cassandra</description>
+        </property>
+        <property>
+            <name>cassandra_nodes</name>
+            <description>The number of nodes of the cassandra 
cluster</description>
+        </property>
+        <property>
+            <name>batch_size</name>
+            <description>The size of a batch sent to a node for 
insertion</description>
+        </property>
+
+        <!-- cassandra settings -->
+        <property>
+            <name>cassandra_host</name>
+            <description>The cassandra host to connect for load</description>
+        </property>
+        <property>
+            <name>cassandra_port</name>
+            <description>The cassandra port to connect for load</description>
+        </property>
+        <property>
+            <name>cassandra_username</name>
+            <description>The cassandra username for load</description>
+        </property>
+        <property>
+            <name>cassandra_password</name>
+            <description>The cassandra password for load</description>
+        </property>
+
+        <!-- Cassandra load job parameters-->
+        <property>
+            <name>hive_fields</name>
+            <description>Hive output fields names</description>
+        </property>
+        <property>
+            <name>hive_fields_types</name>
+            <description>Hive output fields types</description>
+        </property>
+
+        <property>
+            <name>cassandra_cql</name>
+            <description>The cassandra CQL used to load</description>
+        </property>
+        <property>
+            <name>cassandra_keyspace</name>
+            <description>Keyspace to load</description>
+        </property>
+        <property>
+            <name>cassandra_table</name>
+            <description>Table to load</description>
+        </property>
+        <property>
+            <name>cassandra_fields</name>
+            <description>Fields from hive_fields to be loaded</description>
+        </property>
+        <property>
+            <name>cassandra_primary_keys</name>
+            <description>Primary keys name in cassandra</description>
+        </property>
+
+        <property>
+            <name>constant_output_domain_value</name>
+            <description>Constant value for cassandra _domain 
field</description>
+        </property>
+        <property>
+            <name>constant_output_granularity_value</name>
+            <description>Constant value for cassandra granularity 
field</description>
+        </property>
+        <property>
+            <name>constant_output_tid_value</name>
+            <description>Constant value for cassandra _tid field</description>
+        </property>
+
+    </parameters>
+
+    <start to="prepare_data"/>
+
+    <action name="prepare_data">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <job-xml>${hive_site_xml}</job-xml>
+            <configuration>
+                <property>
+                    <name>mapreduce.job.queuename</name>
+                    <value>${queue_name}</value>
+                </property>
+                <!--make sure oozie:launcher runs in a low priority queue -->
+                <property>
+                    <name>oozie.launcher.mapred.job.queue.name</name>
+                    <value>${oozie_launcher_queue_name}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
+                    <value>${oozie_launcher_memory}</value>
+                </property>
+                <!--Let hive decide on the number of reducers -->
+                <property>
+                    <name>mapred.reduce.tasks</name>
+                    <value>-1</value>
+                </property>
+                <property>
+                    <name>hive.exec.scratchdir</name>
+                    <value>/tmp/hive-${user}</value>
+                </property>
+            </configuration>
+
+            <script>${hive_script}</script>
+            <param>source_table_1=${source_table_1}</param>
+            <param>source_table_2=${source_table_2}</param>
+            <param>separator=${hive_value_separator}</param>
+            
<param>destination_directory=${temporary_directory}/${wf:id()}-${cassandra_keyspace}</param>
+        </hive>
+
+        <ok to="load_cassandra"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="load_cassandra">
+        <map-reduce>
+            <job-tracker>${job_tracker}</job-tracker>
+            <name-node>${name_node}</name-node>
+            <configuration>
+
+                <!-- Global Configuration -->
+
+                <!-- Ensure classpath jars are loading in correct order-->
+                <property>
+                    <name>mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+
+                <!--Set queue -->
+                <property>
+                    <name>mapreduce.job.queuename</name>
+                    <value>${queue_name}</value>
+                </property>
+
+                <!--make sure oozie:launcher runs in a low priority queue -->
+                <property>
+                    <name>oozie.launcher.mapred.job.queue.name</name>
+                    <value>${oozie_launcher_queue_name}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.map.memory.mb</name>
+                    <value>${oozie_launcher_memory}</value>
+                </property>
+
+                <!-- Set mapper and reducer to yarn api style-->
+                <property>
+                    <name>mapred.mapper.new-api</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.reducer.new-api</name>
+                    <value>true</value>
+                </property>
+
+                <!-- Set logging to INFO -->
+                <property>
+                    <name>mapreduce.map.log.level</name>
+                    <value>INFO</value>
+                </property>
+                <property>
+                    <name>mapreduce.reduce.log.level</name>
+                    <value>INFO</value>
+                </property>
+
+
+                <!-- Map side -->
+                <!-- Default mapper - Nothing done -->
+                <property>
+                    <name>mapreduce.map.class</name>
+                    <value>org.apache.hadoop.mapreduce.Mapper</value>
+                </property>
+                <!-- compress map output -->
+                <property>
+                    <name>mapreduce.map.output.compress</name>
+                    <value>true</value>
+                </property>
+                <!-- input format and classes -->
+                <property>
+                    <name>mapreduce.job.inputformat.class</name>
+                    
<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
+                </property>
+                <property>
+                    <name>mapreduce.job.input.key.class</name>
+                    <value>org.apache.hadoop.io.LongWritable</value>
+                </property>
+                <property>
+                    <name>mapreduce.job.input.value.class</name>
+                    <value>org.apache.hadoop.io.Text</value>
+                </property>
+                <!-- map output classes-->
+                <property>
+                    <name>mapreduce.map.output.key.class</name>
+                    <value>org.apache.hadoop.io.LongWritable</value>
+                </property>
+                <property>
+                    <name>mapreduce.map.output.value.class</name>
+                    <value>org.apache.hadoop.io.Text</value>
+                </property>
+                <!-- input dir -->
+                <property>
+                    <name>mapreduce.input.fileinputformat.inputdir</name>
+                    
<value>${temporary_directory}/${wf:id()}-${cassandra_keyspace}</value>
+                </property>
+
+
+                <!-- Reduce side -->
+                <!-- Load to Cassandra reducer with X reducers-->
+                <property>
+                    <name>mapreduce.reduce.class</name>
+                    <value>${cassandra_reducer_class}</value>
+                </property>
+                <property>
+                    <name>mapreduce.job.reduces</name>
+                    <value>${cassandra_parallel_loaders}</value>
+                </property>
+                <property>
+                    <name>mapreduce.reduce.speculative</name>
+                    <value>false</value>
+                </property>
+
+                <!-- Cassandra output format and classes -->
+                <property>
+                    <name>mapreduce.job.outputformat.class</name>
+                    <value>${cassandra_output_format_class}</value>
+                </property>
+                <property>
+                    <name>mapreduce.job.output.key.class</name>
+                    <value>java.util.Map</value>
+                </property>
+                <property>
+                    <name>mapreduce.job.output.value.class</name>
+                    <value>java.util.List</value>
+                </property>
+
+                <!-- Cassandra reducer core config-->
+                <!-- Cassandra host - legacy naming-->
+                <property>
+                    <name>cassandra.output.thrift.address</name>
+                    <value>${cassandra_host}</value>
+                </property>
+                <property>
+                    <name>cassandra.output.native.port</name>
+                    <value>${cassandra_port}</value>
+                </property>
+                <property>
+                    <name>cassandra.input.native.auth.provider</name>
+                    
<value>com.datastax.driver.core.PlainTextAuthProvider</value>
+                </property>
+                <property>
+                    <name>cassandra.username</name>
+                    <value>${cassandra_username}</value>
+                </property>
+                <property>
+                    <name>cassandra.password</name>
+                    <value>${cassandra_password}</value>
+                </property>
+                <property>
+                    <!--Quoted on purpose -->
+                    <name>cassandra.output.keyspace</name>
+                    <value>"${cassandra_keyspace}"</value>
+                </property>
+                <property>
+                    <!-- actually used for column familly - quoted on 
purpose-->
+                    <name>mapreduce.output.basename</name>
+                    <value>"${cassandra_table}"</value>
+                </property>
+                <property>
+                    <name>cassandra.output.partitioner.class</name>
+                    <value>Murmur3Partitioner</value>
+                </property>
+                <property>
+                    <name>cassandra.output.cql</name>
+                    <value>${cassandra_cql}</value>
+                </property>
+                <property>
+                    
<name>mapreduce.output.columnfamilyoutputformat.batch.threshold</name>
+                    <value>${batch_size}</value>
+                </property>
+                <property>
+                    
<name>mapreduce.output.columnfamilyoutputformat.queue.size</name>
+                    <value>${batch_size * cassandra_nodes}</value>
+                </property>
+
+
+                <!-- Cassandra reducer specific config-->
+                <property>
+                    <name>input_separator</name>
+                    <value>${hive_value_separator}</value>
+                </property>
+                <property>
+                    <name>input_fields</name>
+                    <value>${hive_fields}</value>
+                </property>
+                <property>
+                    <name>input_fields_types</name>
+                    <value>${hive_fields_types}</value>
+                </property>
+                <property>
+                    <name>output_fields</name>
+                    <value>${cassandra_fields}</value>
+                </property>
+                <property>
+                    <name>output_primary_keys</name>
+                    <value>${cassandra_primary_keys}</value>
+                </property>
+                <property>
+                    <name>${constant_output_domain_field}</name>
+                    <value>${constant_output_domain_value}</value>
+                </property>
+                <property>
+                    <name>${constant_output_granularity_field}</name>
+                    <value>${constant_output_granularity_value}</value>
+                </property>
+                <property>
+                    <name>${constant_output_tid_field}</name>
+                    <value>${constant_output_tid_value}</value>
+                </property>
+            </configuration>
+            <archive>${refinery_cassandra_jar_path}</archive>
+
+        </map-reduce>
+        <ok to="remove_temporary_data"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="remove_temporary_data">
+        <fs>
+            <delete 
path="${temporary_directory}/${wf:id()}-${cassandra_keyspace}"/>
+        </fs>
+        <ok to="end"/>
+        <error to="send_error_email"/>
+    </action>
+
+    <action name="send_error_email">
+        <sub-workflow>
+            <app-path>${send_error_email_workflow_file}</app-path>
+            <propagate-configuration/>
+            <configuration>
+                <property>
+                    <name>parent_name</name>
+                    <value>${wf:name()}</value>
+                </property>
+                <property>
+                    <name>parent_failed_action</name>
+                    <value>${wf:lastErrorNode()}</value>
+                </property>
+                <property>
+                    <name>parent_error_code</name>
+                    <value>${wf:errorCode(wf:lastErrorNode())}</value>
+                </property>
+                <property>
+                    <name>parent_error_message</name>
+                    <value>${wf:errorMessage(wf:lastErrorNode())}</value>
+                </property>
+            </configuration>
+        </sub-workflow>
+        <ok to="kill"/>
+        <error to="kill"/>
+    </action>
+
+    <kill name="kill">
+        <message>Action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

-- 
To view, visit https://gerrit.wikimedia.org/r/339421
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I734b54d3bd95d373b76163d3caac7c90a07e143e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to