Mforns has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/339421 )
Change subject: Add oozie workflow to load projectcounts to AQS
......................................................................
Add oozie workflow to load projectcounts to AQS
Bug: T156388
Change-Id: I734b54d3bd95d373b76163d3caac7c90a07e143e
---
A oozie/cassandra/historical/projectcounts_per_project.hql
A oozie/cassandra/historical/projectcounts_per_project.properties
A oozie/cassandra/historical/workflow.xml
3 files changed, 577 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery
refs/changes/21/339421/1
diff --git a/oozie/cassandra/historical/projectcounts_per_project.hql
b/oozie/cassandra/historical/projectcounts_per_project.hql
new file mode 100644
index 0000000..e57f9f5
--- /dev/null
+++ b/oozie/cassandra/historical/projectcounts_per_project.hql
@@ -0,0 +1,69 @@
+-- Parameters:
+-- destination_directory -- HDFS path to write output files
+-- source_table_1 -- Fully qualified projectcounts table
+-- source_table_2 -- Fully qualified abbreviation map table
+-- separator -- Separator for values
+--
+-- Usage:
+-- hive -f projectcounts_per_project.hql \
+-- -d destination_directory=/tmp/projectcounts_per_project \
+-- -d source_table_1=wmf.projectcounts_raw \
+-- -d source_table_2=wmf.abbreviation_domain_map \
+-- -d separator=\t
+--
+
+
+SET hive.exec.compress.output=true;
+SET
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
+
+
+INSERT OVERWRITE DIRECTORY "${destination_directory}"
+ -- Since "ROW FORMAT DELIMITED DELIMITED FIELDS TERMINATED BY ' '" only
+ -- works for exports to local directories (see HIVE-5672), we have to
+ -- prepare the lines by hand through concatenation :-(
+ SELECT
+ CONCAT_WS('${separator}',
+ COALESCE(hostname, 'all-projects'),
+ COALESCE(IF(access_site IN ('mobile', 'zero'), 'mobile-web',
access_site), 'all-access'),
+ CONCAT(LPAD(year, 4, "0"), LPAD(month, 2, "0"), LPAD(day, 2, "0"),
LPAD(hour, 2, "0")),
+ CAST(SUM(view_count) AS STRING))
+ FROM
+ ${source_table_1}
+ INNER JOIN
+ ${source_table_2}
+ ON
+ wiki_code = abbreviation
+ GROUP BY
+ hostname,
+ access_site,
+ year,
+ month,
+ day,
+ hour
+ GROUPING SETS (
+ (
+ hostname,
+ access_site,
+ year,
+ month,
+ day,
+ hour
+ ),(
+ hostname,
+ year,
+ month,
+ day,
+ hour
+ ),(
+ access_site,
+ year,
+ month,
+ day,
+ hour
+ ),(
+ year,
+ month,
+ day,
+ hour
+ )
+ );
diff --git a/oozie/cassandra/historical/projectcounts_per_project.properties
b/oozie/cassandra/historical/projectcounts_per_project.properties
new file mode 100644
index 0000000..fbd49cb
--- /dev/null
+++ b/oozie/cassandra/historical/projectcounts_per_project.properties
@@ -0,0 +1,78 @@
+# Configures a workflow to manage loading cassandra for the per_project
+# historical pageview API. Any of the following properties are overidable with
-D.
+# Usage:
+# oozie job -Duser=$USER -submit -config
oozie/cassandra/wf_per_project_historical.properties
+#
+# NOTE: The $oozie_directory must be synced to HDFS so that all relevant
+# .xml files exist there when this job is submitted.
+
+
+name_node = hdfs://analytics-hadoop
+job_tracker = resourcemanager.analytics.eqiad.wmnet:8032
+queue_name = default
+
+user = hdfs
+
+# Base path in HDFS to refinery.
+# When submitting this job for production, you should
+# override this to point directly at a deployed
+# directory name, and not the 'symbolic' 'current' directory.
+# E.g. /wmf/refinery/2015-01-05T17.59.18Z--7bb7f07
+refinery_directory = ${name_node}/wmf/refinery/current
+
+# HDFS path to the refinery job jar that will be used by this job.
+refinery_cassandra_jar_path =
${refinery_directory}/artifacts/org/wikimedia/analytics/refinery/refinery-cassandra-0.0.35.jar
+cassandra_reducer_class =
org.wikimedia.analytics.refinery.cassandra.ReducerToCassandra
+cassandra_output_format_class =
org.wikimedia.analytics.refinery.cassandra.CqlOutputFormat
+
+# Base path in HDFS to oozie files.
+# Other files will be used relative to this path.
+oozie_directory = ${refinery_directory}/oozie
+
+# HDFS path to workflows to run.
+workflow_file =
${oozie_directory}/cassandra/historical/workflow.xml
+
+# Workflow to send an error email
+send_error_email_workflow_file =
${oozie_directory}/util/send_error_email/workflow.xml
+
+# HDFS path to hive-site.xml file. This is needed to run hive actions.
+hive_site_xml = ${name_node}/user/hive/hive-site.xml
+# Temporary directory
+temporary_directory = ${name_node}/tmp
+
+# Cassandra cluster info
+cassandra_host = aqs1004-a.eqiad.wmnet
+cassandra_port = 9042
+cassandra_username = aqsloader
+cassandra_password = cassandra
+cassandra_nodes = 6
+batch_size = 1024
+
+# Hive value separator
+hive_value_separator = \\t
+# Cassandra table to be loaded (not job dependant)
+cassandra_table = data
+
+# Constant field names and value to be loaded into cassandra
+constant_output_domain_field = _domain
+constant_output_domain_value = analytics.wikimedia.org,text
+constant_output_granularity_field = granularity
+constant_output_tid_field = _tid
+constant_output_tid_value = 0,timeuuid
+
+hive_script = projectcounts_per_project.hql
+projectcounts_table = wmf.projectcounts_raw
+abbreviation_table = wmf.abbreviation_domain_map
+
+cassandra_parallel_loaders = 1
+cassandra_keyspace =
local_group_default_T_pagecounts_per_project
+cassandra_cql = UPDATE "${cassandra_keyspace}"."data" SET
"v" = ?
+hive_fields = project,access,timestamp,v
+hive_fields_types = text,text,text,bigint
+cassandra_fields = v
+cassandra_primary_keys =
_domain,project,access,granularity,timestamp,_tid
+
+# Coordintator to start.
+oozie.wf.application.path = ${workflow_file}
+oozie.use.system.libpath = true
+oozie.action.external.stats.write = true
diff --git a/oozie/cassandra/historical/workflow.xml
b/oozie/cassandra/historical/workflow.xml
new file mode 100644
index 0000000..698f639
--- /dev/null
+++ b/oozie/cassandra/historical/workflow.xml
@@ -0,0 +1,430 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<workflow-app xmlns="uri:oozie:workflow:0.4"
+ name="cassandra-historical-wf-${cassandra_keyspace}">
+
+ <parameters>
+
+ <!-- Default values for inner oozie settings -->
+ <property>
+ <name>oozie_launcher_queue_name</name>
+ <value>${queue_name}</value>
+ </property>
+ <property>
+ <name>oozie_launcher_memory</name>
+ <value>256</value>
+ </property>
+
+ <!-- Required properties -->
+ <property><name>queue_name</name></property>
+ <property><name>name_node</name></property>
+ <property><name>job_tracker</name></property>
+
+ <property>
+ <name>hive_script</name>
+ <description>Hive script to run.</description>
+ </property>
+ <property>
+ <name>hive_site_xml</name>
+ <description>hive-site.xml file path in HDFS</description>
+ </property>
+ <property>
+ <name>source_table_1</name>
+ <description>Main hive table to use</description>
+ </property>
+ <property>
+ <name>source_table_2</name>
+ <description>Secondary hive table to use</description>
+ </property>
+ <property>
+ <name>temporary_directory</name>
+ <description>A temporary directory to store data.</description>
+ </property>
+ <property>
+ <name>send_error_email_workflow_file</name>
+ <description>Workflow for sending an email</description>
+ </property>
+
+ <!-- cassandra loader -->
+ <property>
+ <name>refinery_cassandra_jar_path</name>
+ <description>The refinery-cassandra jar file path in
HDFS</description>
+ </property>
+ <property>
+ <name>cassandra_reducer_class</name>
+ <description>The reducer class to be used in refinery-cassandra
jar</description>
+ </property>
+ <property>
+ <name>cassandra_output_format_class</name>
+ <description>The output format class to be used in
refinery-cassandra jar</description>
+ </property>
+ <property>
+ <name>cassandra_parallel_loaders</name>
+ <description>The number of reducers to parallel load
cassandra</description>
+ </property>
+ <property>
+ <name>cassandra_nodes</name>
+ <description>The number of nodes of the cassandra
cluster</description>
+ </property>
+ <property>
+ <name>batch_size</name>
+ <description>The size of a batch sent to a node for
insertion</description>
+ </property>
+
+ <!-- cassandra settings -->
+ <property>
+ <name>cassandra_host</name>
+ <description>The cassandra host to connect for load</description>
+ </property>
+ <property>
+ <name>cassandra_port</name>
+ <description>The cassandra port to connect for load</description>
+ </property>
+ <property>
+ <name>cassandra_username</name>
+ <description>The cassandra username for load</description>
+ </property>
+ <property>
+ <name>cassandra_password</name>
+ <description>The cassandra password for load</description>
+ </property>
+
+ <!-- Cassandra load job parameters-->
+ <property>
+ <name>hive_fields</name>
+ <description>Hive output fields names</description>
+ </property>
+ <property>
+ <name>hive_fields_types</name>
+ <description>Hive output fields types</description>
+ </property>
+
+ <property>
+ <name>cassandra_cql</name>
+ <description>The cassandra CQL used to load</description>
+ </property>
+ <property>
+ <name>cassandra_keyspace</name>
+ <description>Keyspace to load</description>
+ </property>
+ <property>
+ <name>cassandra_table</name>
+ <description>Table to load</description>
+ </property>
+ <property>
+ <name>cassandra_fields</name>
+ <description>Fields from hive_fields to be loaded</description>
+ </property>
+ <property>
+ <name>cassandra_primary_keys</name>
+ <description>Primary keys name in cassandra</description>
+ </property>
+
+ <property>
+ <name>constant_output_domain_value</name>
+ <description>Constant value for cassandra _domain
field</description>
+ </property>
+ <property>
+ <name>constant_output_granularity_value</name>
+ <description>Constant value for cassandra granularity
field</description>
+ </property>
+ <property>
+ <name>constant_output_tid_value</name>
+ <description>Constant value for cassandra _tid field</description>
+ </property>
+
+ </parameters>
+
+ <start to="prepare_data"/>
+
+ <action name="prepare_data">
+ <hive xmlns="uri:oozie:hive-action:0.2">
+ <job-tracker>${job_tracker}</job-tracker>
+ <name-node>${name_node}</name-node>
+ <job-xml>${hive_site_xml}</job-xml>
+ <configuration>
+ <property>
+ <name>mapreduce.job.queuename</name>
+ <value>${queue_name}</value>
+ </property>
+ <!--make sure oozie:launcher runs in a low priority queue -->
+ <property>
+ <name>oozie.launcher.mapred.job.queue.name</name>
+ <value>${oozie_launcher_queue_name}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapreduce.map.memory.mb</name>
+ <value>${oozie_launcher_memory}</value>
+ </property>
+ <!--Let hive decide on the number of reducers -->
+ <property>
+ <name>mapred.reduce.tasks</name>
+ <value>-1</value>
+ </property>
+ <property>
+ <name>hive.exec.scratchdir</name>
+ <value>/tmp/hive-${user}</value>
+ </property>
+ </configuration>
+
+ <script>${hive_script}</script>
+ <param>source_table_1=${source_table_1}</param>
+ <param>source_table_2=${source_table_2}</param>
+ <param>separator=${hive_value_separator}</param>
+
<param>destination_directory=${temporary_directory}/${wf:id()}-${cassandra_keyspace}</param>
+ </hive>
+
+ <ok to="load_cassandra"/>
+ <error to="send_error_email"/>
+ </action>
+
+ <action name="load_cassandra">
+ <map-reduce>
+ <job-tracker>${job_tracker}</job-tracker>
+ <name-node>${name_node}</name-node>
+ <configuration>
+
+ <!-- Global Configuration -->
+
+ <!-- Ensure classpath jars are loading in correct order-->
+ <property>
+ <name>mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+
+ <!--Set queue -->
+ <property>
+ <name>mapreduce.job.queuename</name>
+ <value>${queue_name}</value>
+ </property>
+
+ <!--make sure oozie:launcher runs in a low priority queue -->
+ <property>
+ <name>oozie.launcher.mapred.job.queue.name</name>
+ <value>${oozie_launcher_queue_name}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapreduce.map.memory.mb</name>
+ <value>${oozie_launcher_memory}</value>
+ </property>
+
+ <!-- Set mapper and reducer to yarn api style-->
+ <property>
+ <name>mapred.mapper.new-api</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>mapred.reducer.new-api</name>
+ <value>true</value>
+ </property>
+
+ <!-- Set logging to INFO -->
+ <property>
+ <name>mapreduce.map.log.level</name>
+ <value>INFO</value>
+ </property>
+ <property>
+ <name>mapreduce.reduce.log.level</name>
+ <value>INFO</value>
+ </property>
+
+
+ <!-- Map side -->
+ <!-- Default mapper - Nothing done -->
+ <property>
+ <name>mapreduce.map.class</name>
+ <value>org.apache.hadoop.mapreduce.Mapper</value>
+ </property>
+ <!-- compress map output -->
+ <property>
+ <name>mapreduce.map.output.compress</name>
+ <value>true</value>
+ </property>
+ <!-- input format and classes -->
+ <property>
+ <name>mapreduce.job.inputformat.class</name>
+
<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
+ </property>
+ <property>
+ <name>mapreduce.job.input.key.class</name>
+ <value>org.apache.hadoop.io.LongWritable</value>
+ </property>
+ <property>
+ <name>mapreduce.job.input.value.class</name>
+ <value>org.apache.hadoop.io.Text</value>
+ </property>
+ <!-- map output classes-->
+ <property>
+ <name>mapreduce.map.output.key.class</name>
+ <value>org.apache.hadoop.io.LongWritable</value>
+ </property>
+ <property>
+ <name>mapreduce.map.output.value.class</name>
+ <value>org.apache.hadoop.io.Text</value>
+ </property>
+ <!-- input dir -->
+ <property>
+ <name>mapreduce.input.fileinputformat.inputdir</name>
+
<value>${temporary_directory}/${wf:id()}-${cassandra_keyspace}</value>
+ </property>
+
+
+ <!-- Reduce side -->
+ <!-- Load to Cassandra reducer with X reducers-->
+ <property>
+ <name>mapreduce.reduce.class</name>
+ <value>${cassandra_reducer_class}</value>
+ </property>
+ <property>
+ <name>mapreduce.job.reduces</name>
+ <value>${cassandra_parallel_loaders}</value>
+ </property>
+ <property>
+ <name>mapreduce.reduce.speculative</name>
+ <value>false</value>
+ </property>
+
+ <!-- Cassandra output format and classes -->
+ <property>
+ <name>mapreduce.job.outputformat.class</name>
+ <value>${cassandra_output_format_class}</value>
+ </property>
+ <property>
+ <name>mapreduce.job.output.key.class</name>
+ <value>java.util.Map</value>
+ </property>
+ <property>
+ <name>mapreduce.job.output.value.class</name>
+ <value>java.util.List</value>
+ </property>
+
+ <!-- Cassandra reducer core config-->
+ <!-- Cassandra host - legacy naming-->
+ <property>
+ <name>cassandra.output.thrift.address</name>
+ <value>${cassandra_host}</value>
+ </property>
+ <property>
+ <name>cassandra.output.native.port</name>
+ <value>${cassandra_port}</value>
+ </property>
+ <property>
+ <name>cassandra.input.native.auth.provider</name>
+
<value>com.datastax.driver.core.PlainTextAuthProvider</value>
+ </property>
+ <property>
+ <name>cassandra.username</name>
+ <value>${cassandra_username}</value>
+ </property>
+ <property>
+ <name>cassandra.password</name>
+ <value>${cassandra_password}</value>
+ </property>
+ <property>
+ <!--Quoted on purpose -->
+ <name>cassandra.output.keyspace</name>
+ <value>"${cassandra_keyspace}"</value>
+ </property>
+ <property>
+ <!-- actually used for column familly - quoted on
purpose-->
+ <name>mapreduce.output.basename</name>
+ <value>"${cassandra_table}"</value>
+ </property>
+ <property>
+ <name>cassandra.output.partitioner.class</name>
+ <value>Murmur3Partitioner</value>
+ </property>
+ <property>
+ <name>cassandra.output.cql</name>
+ <value>${cassandra_cql}</value>
+ </property>
+ <property>
+
<name>mapreduce.output.columnfamilyoutputformat.batch.threshold</name>
+ <value>${batch_size}</value>
+ </property>
+ <property>
+
<name>mapreduce.output.columnfamilyoutputformat.queue.size</name>
+ <value>${batch_size * cassandra_nodes}</value>
+ </property>
+
+
+ <!-- Cassandra reducer specific config-->
+ <property>
+ <name>input_separator</name>
+ <value>${hive_value_separator}</value>
+ </property>
+ <property>
+ <name>input_fields</name>
+ <value>${hive_fields}</value>
+ </property>
+ <property>
+ <name>input_fields_types</name>
+ <value>${hive_fields_types}</value>
+ </property>
+ <property>
+ <name>output_fields</name>
+ <value>${cassandra_fields}</value>
+ </property>
+ <property>
+ <name>output_primary_keys</name>
+ <value>${cassandra_primary_keys}</value>
+ </property>
+ <property>
+ <name>${constant_output_domain_field}</name>
+ <value>${constant_output_domain_value}</value>
+ </property>
+ <property>
+ <name>${constant_output_granularity_field}</name>
+ <value>${constant_output_granularity_value}</value>
+ </property>
+ <property>
+ <name>${constant_output_tid_field}</name>
+ <value>${constant_output_tid_value}</value>
+ </property>
+ </configuration>
+ <archive>${refinery_cassandra_jar_path}</archive>
+
+ </map-reduce>
+ <ok to="remove_temporary_data"/>
+ <error to="send_error_email"/>
+ </action>
+
+ <action name="remove_temporary_data">
+ <fs>
+ <delete
path="${temporary_directory}/${wf:id()}-${cassandra_keyspace}"/>
+ </fs>
+ <ok to="end"/>
+ <error to="send_error_email"/>
+ </action>
+
+ <action name="send_error_email">
+ <sub-workflow>
+ <app-path>${send_error_email_workflow_file}</app-path>
+ <propagate-configuration/>
+ <configuration>
+ <property>
+ <name>parent_name</name>
+ <value>${wf:name()}</value>
+ </property>
+ <property>
+ <name>parent_failed_action</name>
+ <value>${wf:lastErrorNode()}</value>
+ </property>
+ <property>
+ <name>parent_error_code</name>
+ <value>${wf:errorCode(wf:lastErrorNode())}</value>
+ </property>
+ <property>
+ <name>parent_error_message</name>
+ <value>${wf:errorMessage(wf:lastErrorNode())}</value>
+ </property>
+ </configuration>
+ </sub-workflow>
+ <ok to="kill"/>
+ <error to="kill"/>
+ </action>
+
+ <kill name="kill">
+ <message>Action failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+ </kill>
+ <end name="end"/>
+</workflow-app>
--
To view, visit https://gerrit.wikimedia.org/r/339421
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I734b54d3bd95d373b76163d3caac7c90a07e143e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits