Ottomata has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/349723 )
Change subject: Add --generate-jar and --jar-file options
......................................................................
Add --generate-jar and --jar-file options
This change adds the option to generate a jar file containing all the
ORM java code that sqoop would otherwise have to generate by hitting the
source. It also adds the option to use that generated file. It also
adds the jar that was generated during testing to artifacts/
Open Question: sqoop is accepting the --jar-file parameter and uses it:
INFO tool.CodeGenTool: Using existing jar: ...jar
However, it's still executing a SQL query and generating the avsc file
for each table. I wonder if this is just unavoidable.
Deployment plan:
* (DONE) generate the jar, commit and merge to this repo
* (DONE) test running the sqoop with a --jar-file
* change the sqoop cron in puppet to use the new jar
NOTE: to compile these ORM .java files manually, the command is:
javac -classpath \
/usr/lib/hadoop/* \
:/usr/lib/hadoop-0.20-mapreduce/* \
:/usr/lib/hadoop-hdfs/* \
:/usr/lib/sqoop/* \
<<pattern-for-all-your-table-ORM-files>>.java
Bug: T143119
Change-Id: I984ce686f65ac61802d07812a6f17dba9a92fe3e
---
A artifacts/mediawiki-tables-sqoop-orm.jar
M bin/sqoop-mediawiki-tables
M setup.cfg
3 files changed, 100 insertions(+), 51 deletions(-)
Approvals:
Ottomata: Verified; Looks good to me, approved
Joal: Looks good to me, but someone else must approve
diff --git a/artifacts/mediawiki-tables-sqoop-orm.jar
b/artifacts/mediawiki-tables-sqoop-orm.jar
new file mode 100644
index 0000000..0e227df
--- /dev/null
+++ b/artifacts/mediawiki-tables-sqoop-orm.jar
@@ -0,0 +1 @@
+#$# git-fat f17fd1e2705c0c5fec37fc92d9a05f302dd9fbf4 202764
diff --git a/bin/sqoop-mediawiki-tables b/bin/sqoop-mediawiki-tables
index 200523d..a2571ae 100755
--- a/bin/sqoop-mediawiki-tables
+++ b/bin/sqoop-mediawiki-tables
@@ -33,7 +33,8 @@
[--verbose] --wiki-file WIKIS --timestamp TIME
[--max-tries TRIES] [--force] --snapshot SNAPSHOT
[--mappers NUM] [--processors NUM] --user NAME
- [--job-name HADOOP_JOB_NAME] [--labsdb] --password-file FILE
+ [--job-name JOB_NAME] [--labsdb] --password-file FILE
+ [--generate-jar JAR_OUT|--jar-file JAR_IN]
Options:
-h --help Show this help message and exit.
@@ -72,11 +73,22 @@
-l --labsdb Add '_p' postfix to table names for
labsdb
-f --force Deleting existing folders before
importing
instead of failing
+ -g JAR_OUT --generate-jar JAR_OUT Instead of running the job, just pick
one wiki
+ and generate the java classes for each
table,
+ then bundle them into a JAR for later
use. Save
+ the generated jar to JAR_OUT, in a
file named
+ JAR_OUT/mediawiki-tables-sqoop-orm.jar
+ NOTE: etwiki will be used to generate
ORM classes
+ -r JAR_IN --jar-file JAR_IN Disable code generation and use a jar
file
+ with pre-compiled ORM classes. The
class names
+ will be convention-based and assumed
to be the
+ same as running this script with -g
"""
__author__ = 'Dan Andreesu <[email protected]>'
import csv
import logging
+import os
import sys
from docopt import docopt
@@ -84,18 +96,17 @@
from itertools import groupby
from subprocess import check_call
from traceback import format_exc
+from tempfile import mkstemp
from refinery.util import is_yarn_application_running, HdfsUtils
queries = {}
-# NOTE: labsdb excludes fields according to:
-#
https://github.com/wikimedia/operations-software-redactatron/blob/master/scripts/cols.txt
-# But it's more complicated, rows are also redacted, some reading and
background:
-# https://wikitech.wikimedia.org/wiki/MariaDB/Sanitarium_and_Labsdbs
-# https://phabricator.wikimedia.org/T103011#2296587
-# https://phabricator.wikimedia.org/T138450
-# TODO: follow up with labs and DBAs to figure out how to redact this load
+print('************ NOTE ************')
+print('When sqooping from labs, resulting data will be shareable with the
public '
+ 'but when sqooping from production, resulting data may need to be
redacted or '
+ 'otherwise anonymized before sharing.')
+print('^^^^^^^^^^^^ NOTE ^^^^^^^^^^^^')
def populate_queries(timestamp, labsdb):
@@ -274,7 +285,10 @@
class SqoopConfig:
def __init__(self, yarn_job_name_prefix, user, password_file, jdbc_host,
num_mappers,
- table_path_template, dbname, dbpostfix, table, query,
map_types, current_try):
+ table_path_template, dbname, dbpostfix, table, query,
map_types,
+ generate_jar, jar_file,
+ current_try):
+
self.yarn_job_name_prefix = yarn_job_name_prefix
self.user = user
self.password_file = password_file
@@ -286,8 +300,9 @@
self.table = table
self.query = query
self.map_types = map_types
+ self.generate_jar = generate_jar
+ self.jar_file = jar_file
self.current_try = current_try
-
def sqoop_wiki(config):
@@ -306,28 +321,48 @@
logging.info('STARTING: {}'.format(log_message))
try:
target_directory = (config.table_path_template +
'/wiki_db={db}').format(
- table=config.table, db=config.dbname)
+ table=config.table, db=config.dbname)
+
+ query = config.query
+ command = 'import'
+ if config.generate_jar:
+ query = query + ' and 1=0'
+ command = 'codegen'
+
sqoop_arguments = [
'sqoop',
- 'import',
-
+ command,
'-D' , "mapred.job.name='{}-{}'".format(
config.yarn_job_name_prefix, full_table),
'--username' , config.user,
'--password-file' , config.password_file,
'--connect' , config.jdbc_host + config.dbname +
config.dbpostfix,
- # NOTE: using columns/map/where doesn't let you properly decode
varbinary
- # '--table' , table,
- # '--columns' , queries[table].get('columns') or '',
- # '--where' , queries[table].get('where') or '1=1',
'--query' , config.query,
- '--num-mappers' , config.num_mappers,
- '--target-dir' , target_directory,
- '--as-avrodatafile' ,
]
+ if config.generate_jar:
+ sqoop_arguments += [
+ '--class-name' , config.table,
+ '--outdir' , config.generate_jar,
+ '--bindir' , config.generate_jar,
+ ]
+ else:
+ sqoop_arguments += [
+ '--target-dir' , target_directory,
+ '--num-mappers' , config.num_mappers,
+ '--as-avrodatafile' ,
+ ]
+
+ if config.jar_file:
+ sqoop_arguments += [
+ '--class-name' , config.table,
+ '--jar-file' , config.jar_file,
+ ]
+
if config.map_types:
- sqoop_arguments += ['--map-column-java', config.map_types]
+ sqoop_arguments += [
+ '--map-column-java' , config.map_types
+ ]
logging.info('Sqooping with: {}'.format(sqoop_arguments))
check_call(sqoop_arguments)
@@ -373,6 +408,8 @@
num_processors = int(arguments.get('--processors'))
if arguments.get('--processors') else None
max_tries = int(arguments.get('--max-tries'))
force = arguments.get('--force')
+ generate_jar = arguments.get('--generate-jar')
+ jar_file = arguments.get('--jar-file')
table_path_template = '{hdfs}/{table}/snapshot={snapshot}'.format(
hdfs=target_hdfs_directory,
@@ -404,31 +441,37 @@
if not check_hdfs_path(table_path_template, force):
sys.exit(1)
- # read in the wikis to process and sqoop each one
- with open(db_list_file) as dbs_file:
- # Remove lines starting with dashes
- flat_wikis = [row for row in csv.reader(dbs_file) if not
row[0].startswith('#')]
+ if generate_jar:
+ flat_wikis = [['etwiki', 1]]
+ jar_path = os.path.join(generate_jar, 'mediawiki-tables-sqoop-orm.jar')
+ else:
+ # read in the wikis to process and sqoop each one
+ with open(db_list_file) as dbs_file:
+ # Remove lines starting with dashes
+ flat_wikis = [row for row in csv.reader(dbs_file) if not
row[0].startswith('#')]
- failed_jobs = []
- for group, wikis in groupby(flat_wikis, lambda w: w[1]):
- executor_config_list = []
- for w in wikis:
- for table in queries.keys():
- query = queries[table].get('query')
- map_types = queries[table]['map-types'] if ('map-types' in
queries[table]) else None
-
executor_config_list.append(SqoopConfig(yarn_job_name_prefix, user,
password_file, jdbc_host, num_mappers,
- table_path_template, w[0],
dbpostfix, table, query, map_types, 1))
+ failed_jobs = []
+ for group, wikis in groupby(flat_wikis, lambda w: w[1]):
+ executor_config_list = []
+ for w in wikis:
+ for table in queries.keys():
+ query = queries[table].get('query')
+ map_types = queries[table]['map-types'] if ('map-types' in
queries[table]) else None
+ executor_config_list.append(SqoopConfig(yarn_job_name_prefix,
user, password_file, jdbc_host, num_mappers,
+ table_path_template, w[0],
dbpostfix, table, query, map_types,
+ generate_jar, jar_file, 1))
- # sqoop all wikis in this group and wait for them all to finish
with retry
- with futures.ProcessPoolExecutor(num_processors) as executor:
- current_try = 0
- while (executor_config_list and current_try < max_tries):
- executor_config_list = filter(None,
list(executor.map(sqoop_wiki, executor_config_list)))
- current_try += 1
- failed_jobs.extend(executor_config_list)
+ # sqoop all wikis in this group and wait for them all to finish with
retry
+ with futures.ProcessPoolExecutor(num_processors) as executor:
+ current_try = 0
+ while (executor_config_list and current_try < max_tries):
+ executor_config_list = filter(None,
list(executor.map(sqoop_wiki, executor_config_list)))
+ current_try += 1
+ failed_jobs.extend(executor_config_list)
- # if there were no failures at all, write a success flag to this
dataset
- if not failed_jobs:
+ # if there were no failures at all, write a success flag to this dataset
+ if not failed_jobs:
+ if not generate_jar:
for table in queries.keys():
success_directory = table_path_template.format(table=table)
check_call([
@@ -436,10 +479,15 @@
success_directory + '/_SUCCESS',
])
else:
- to_rerun = ','.join([ ])
- logging.error('*' * 50)
- logging.error('* Jobs to re-run: {}'.format(to_rerun))
- for c in failed_jobs:
- logging.error('* - {}.{}'.format(c.dbname, c.table))
- logging.error('*' * 50)
- sys.exit(1)
+ check_call([
+ 'jar', 'cf', jar_path, '-C', generate_jar, '.'
+ ])
+ logging.info('Generated ORM jar at {}'.format(jar_path))
+ else:
+ to_rerun = ','.join(failed_jobs)
+ logging.error('*' * 50)
+ logging.error('* Jobs to re-run: {}'.format(to_rerun))
+ for c in failed_jobs:
+ logging.error('* - {}.{}'.format(c.dbname, c.table))
+ logging.error('*' * 50)
+ sys.exit(1)
diff --git a/setup.cfg b/setup.cfg
index 436cceb..f1a37e3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -6,5 +6,5 @@
# E711: there are valid reasons to do column != None in SQL Alchemy
ignore = F401,E221,E203,E711
# line lengths should be limited but not to 80
-max-line-length = 90
+max-line-length = 140
exclude = .venv,.tox,dist,doc,build,*.egg
--
To view, visit https://gerrit.wikimedia.org/r/349723
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I984ce686f65ac61802d07812a6f17dba9a92fe3e
Gerrit-PatchSet: 3
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Milimetric <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits