Milimetric has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/349723 )
Change subject: [WIP] Add just-generate-jar and jar-file options
......................................................................
[WIP] Add just-generate-jar and jar-file options
Status: this isn't hard, just a little tedious. Code is a bit of a mess
but conceptually got the generation part down and tested. Just have to
compile the jar and pass that to an actual import to test, and then
make sure it all works in the script. Easy peasy.
Bug: T143119
Change-Id: I984ce686f65ac61802d07812a6f17dba9a92fe3e
---
M bin/sqoop-mediawiki-tables
M setup.cfg
2 files changed, 95 insertions(+), 42 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery
refs/changes/23/349723/1
diff --git a/bin/sqoop-mediawiki-tables b/bin/sqoop-mediawiki-tables
index 200523d..ec6b8cb 100755
--- a/bin/sqoop-mediawiki-tables
+++ b/bin/sqoop-mediawiki-tables
@@ -34,6 +34,7 @@
[--max-tries TRIES] [--force] --snapshot SNAPSHOT
[--mappers NUM] [--processors NUM] --user NAME
[--job-name HADOOP_JOB_NAME] [--labsdb] --password-file FILE
+ [--just-generate-jar JAR_OUTPUT]
Options:
-h --help Show this help message and exit.
@@ -72,6 +73,14 @@
-l --labsdb Add '_p' postfix to table names for
labsdb
-f --force Deleting existing folders before
importing
instead of failing
+ -g --just-generate-jar Instead of running the job, just pick
one wiki
+ and generate the java classes for each
table,
+ then bundle them into a JAR for later
use
+ NOTE: sets --force to True
+ -r --jar-file Disable code generation and use a jar
file
+ with pre-compiled ORM classes. The
class names
+ will be convention-based and assumed
to be the
+ same as running this script with -g
"""
__author__ = 'Dan Andreesu <[email protected]>'
@@ -84,6 +93,7 @@
from itertools import groupby
from subprocess import check_call
from traceback import format_exc
+from tempfile import mkstemp
from refinery.util import is_yarn_application_running, HdfsUtils
@@ -274,7 +284,10 @@
class SqoopConfig:
def __init__(self, yarn_job_name_prefix, user, password_file, jdbc_host,
num_mappers,
- table_path_template, dbname, dbpostfix, table, query,
map_types, current_try):
+ table_path_template, dbname, dbpostfix, table, query,
map_types,
+ just_generate, jar_file, generated_code_dir,
+ current_try):
+
self.yarn_job_name_prefix = yarn_job_name_prefix
self.user = user
self.password_file = password_file
@@ -286,8 +299,10 @@
self.table = table
self.query = query
self.map_types = map_types
+ self.just_generate = just_generate
+ self.jar_file = jar_file
+ self.generated_code_dir = generated_code_dir
self.current_try = current_try
-
def sqoop_wiki(config):
@@ -306,28 +321,42 @@
logging.info('STARTING: {}'.format(log_message))
try:
target_directory = (config.table_path_template +
'/wiki_db={db}').format(
- table=config.table, db=config.dbname)
+ table=config.table, db=config.dbname)
+
+ query = config.query
+ command = 'import'
+ if config.just_generate:
+ query = query + ' and 1=0'
+ command = 'codegen'
+
sqoop_arguments = [
'sqoop',
- 'import',
+ command,
'-D' , "mapred.job.name='{}-{}'".format(
config.yarn_job_name_prefix, full_table),
'--username' , config.user,
'--password-file' , config.password_file,
'--connect' , config.jdbc_host + config.dbname +
config.dbpostfix,
- # NOTE: using columns/map/where doesn't let you properly decode
varbinary
- # '--table' , table,
- # '--columns' , queries[table].get('columns') or '',
- # '--where' , queries[table].get('where') or '1=1',
'--query' , config.query,
- '--num-mappers' , config.num_mappers,
- '--target-dir' , target_directory,
- '--as-avrodatafile' ,
+ '--outdir' , config.generated_code_dir,
+ '--bindir' , config.generated_code_dir,
]
if config.map_types:
sqoop_arguments += ['--map-column-java', config.map_types]
+
+ if config.just_generate or config.jar_file:
+ # When just_generate, we need to create the class, so we name it
+ # When jar_file, we need to use the class, so we use the same name
+ sqoop_arguments += ['--class-name', config.table]
+
+ if not config.just_generate:
+ sqoop_arguments += [
+ '--target-dir' , target_directory,
+ '--num-mappers' , config.num_mappers,
+ '--as-avrodatafile' ,
+ ]
logging.info('Sqooping with: {}'.format(sqoop_arguments))
check_call(sqoop_arguments)
@@ -373,12 +402,20 @@
num_processors = int(arguments.get('--processors'))
if arguments.get('--processors') else None
max_tries = int(arguments.get('--max-tries'))
force = arguments.get('--force')
+ just_generate = arguments.get('--just-generate-jar')
+ jar_file = arguments.get('--jar-file')
table_path_template = '{hdfs}/{table}/snapshot={snapshot}'.format(
hdfs=target_hdfs_directory,
table='{table}',
snapshot=snapshot,
)
+
+ if just_generate:
+ # When we're just generating, we want to write to the tmp directory
+ table_path_template = '/tmp/sqoop-temp-just-generate/{table}'
+ # deleting any previous temporary files
+ force = True
log_level = logging.INFO
if verbose:
@@ -404,42 +441,58 @@
if not check_hdfs_path(table_path_template, force):
sys.exit(1)
- # read in the wikis to process and sqoop each one
- with open(db_list_file) as dbs_file:
- # Remove lines starting with dashes
- flat_wikis = [row for row in csv.reader(dbs_file) if not
row[0].startswith('#')]
+ generated_code_dir = mkstemp()[1]
+ if just_generate:
+ flat_wikis = [['enwiki', 1]]
+ else:
+ # read in the wikis to process and sqoop each one
+ with open(db_list_file) as dbs_file:
+ # Remove lines starting with dashes
+ flat_wikis = [row for row in csv.reader(dbs_file) if not
row[0].startswith('#')]
- failed_jobs = []
- for group, wikis in groupby(flat_wikis, lambda w: w[1]):
- executor_config_list = []
- for w in wikis:
- for table in queries.keys():
- query = queries[table].get('query')
- map_types = queries[table]['map-types'] if ('map-types' in
queries[table]) else None
-
executor_config_list.append(SqoopConfig(yarn_job_name_prefix, user,
password_file, jdbc_host, num_mappers,
- table_path_template, w[0],
dbpostfix, table, query, map_types, 1))
+ failed_jobs = []
+ for group, wikis in groupby(flat_wikis, lambda w: w[1]):
+ executor_config_list = []
+ for w in wikis:
+ for table in queries.keys():
+ query = queries[table].get('query')
+ map_types = queries[table]['map-types'] if ('map-types' in
queries[table]) else None
+ executor_config_list.append(SqoopConfig(yarn_job_name_prefix,
user, password_file, jdbc_host, num_mappers,
+ table_path_template, w[0],
dbpostfix, table, query, map_types,
+ just_generate, jar_file,
generated_code_dir, 1))
- # sqoop all wikis in this group and wait for them all to finish
with retry
- with futures.ProcessPoolExecutor(num_processors) as executor:
- current_try = 0
- while (executor_config_list and current_try < max_tries):
- executor_config_list = filter(None,
list(executor.map(sqoop_wiki, executor_config_list)))
- current_try += 1
- failed_jobs.extend(executor_config_list)
+ # sqoop all wikis in this group and wait for them all to finish with
retry
+ with futures.ProcessPoolExecutor(num_processors) as executor:
+ current_try = 0
+ while (executor_config_list and current_try < max_tries):
+ executor_config_list = filter(None,
list(executor.map(sqoop_wiki, executor_config_list)))
+ current_try += 1
+ failed_jobs.extend(executor_config_list)
- # if there were no failures at all, write a success flag to this
dataset
- if not failed_jobs:
+ # if there were no failures at all, write a success flag to this dataset
+ if not failed_jobs:
+ if just_generate:
+ # NOTE: if trying to compile generated .java files, this is the
command:
+ # javac -classpath \
+ #
/usr/lib/hadoop/*:/usr/lib/hadoop-0.20-mapreduce/*:/usr/lib/hadoop-hdfs/*:/usr/lib/sqoop/*
\
+ # <<generated-table-file>>.java
+ check_call([
+ 'jar', 'cf', 'sqoop-mediawiki-tables.jar',
+ ' '.join([generated_code_dir + '/*class'])
+ ])
+ logging.info('Generated sqoop-mediawiki-tables.jar')
+ else:
for table in queries.keys():
success_directory = table_path_template.format(table=table)
check_call([
'hdfs', 'dfs', '-touchz',
success_directory + '/_SUCCESS',
])
- else:
- to_rerun = ','.join([ ])
- logging.error('*' * 50)
- logging.error('* Jobs to re-run: {}'.format(to_rerun))
- for c in failed_jobs:
- logging.error('* - {}.{}'.format(c.dbname, c.table))
- logging.error('*' * 50)
- sys.exit(1)
+ else:
+ to_rerun = ','.join(failed_jobs)
+ logging.error('*' * 50)
+ logging.error('* Jobs to re-run: {}'.format(to_rerun))
+ for c in failed_jobs:
+ logging.error('* - {}.{}'.format(c.dbname, c.table))
+ logging.error('*' * 50)
+ sys.exit(1)
diff --git a/setup.cfg b/setup.cfg
index 436cceb..f1a37e3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -6,5 +6,5 @@
# E711: there are valid reasons to do column != None in SQL Alchemy
ignore = F401,E221,E203,E711
# line lengths should be limited but not to 80
-max-line-length = 90
+max-line-length = 140
exclude = .venv,.tox,dist,doc,build,*.egg
--
To view, visit https://gerrit.wikimedia.org/r/349723
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I984ce686f65ac61802d07812a6f17dba9a92fe3e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Milimetric <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits