Milimetric has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/349723 )

Change subject: [WIP] Add just-generate-jar and jar-file options
......................................................................

[WIP] Add just-generate-jar and jar-file options

Status: this isn't hard, just a little tedious.  Code is a bit of a mess
but conceptually got the generation part down and tested.  Just have to
compile the jar and pass that to an actual import to test, and then
make sure it all works in the script.  Easy peasy.

Bug: T143119
Change-Id: I984ce686f65ac61802d07812a6f17dba9a92fe3e
---
M bin/sqoop-mediawiki-tables
M setup.cfg
2 files changed, 95 insertions(+), 42 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery 
refs/changes/23/349723/1

diff --git a/bin/sqoop-mediawiki-tables b/bin/sqoop-mediawiki-tables
index 200523d..ec6b8cb 100755
--- a/bin/sqoop-mediawiki-tables
+++ b/bin/sqoop-mediawiki-tables
@@ -34,6 +34,7 @@
           [--max-tries TRIES] [--force] --snapshot SNAPSHOT
           [--mappers NUM] [--processors NUM] --user NAME
           [--job-name HADOOP_JOB_NAME] [--labsdb] --password-file FILE
+          [--just-generate-jar JAR_OUTPUT]
 
 Options:
     -h --help                           Show this help message and exit.
@@ -72,6 +73,14 @@
     -l --labsdb                         Add '_p' postfix to table names for 
labsdb
     -f --force                          Deleting existing folders before 
importing
                                         instead of failing
+    -g --just-generate-jar              Instead of running the job, just pick 
one wiki
+                                        and generate the java classes for each 
table,
+                                        then bundle them into a JAR for later 
use
+                                        NOTE: sets --force to True
+    -r --jar-file                       Disable code generation and use a jar 
file
+                                        with pre-compiled ORM classes.  The 
class names
+                                        will be convention-based and assumed 
to be the
+                                        same as running this script with -g
 """
 __author__ = 'Dan Andreesu <[email protected]>'
 
@@ -84,6 +93,7 @@
 from itertools import groupby
 from subprocess import check_call
 from traceback import format_exc
+from tempfile import mkstemp
 
 from refinery.util import is_yarn_application_running, HdfsUtils
 
@@ -274,7 +284,10 @@
 class SqoopConfig:
 
     def __init__(self, yarn_job_name_prefix, user, password_file, jdbc_host, 
num_mappers,
-                 table_path_template, dbname, dbpostfix, table, query, 
map_types, current_try):
+                 table_path_template, dbname, dbpostfix, table, query, 
map_types,
+                 just_generate, jar_file, generated_code_dir,
+                 current_try):
+
         self.yarn_job_name_prefix = yarn_job_name_prefix
         self.user = user
         self.password_file = password_file
@@ -286,8 +299,10 @@
         self.table = table
         self.query = query
         self.map_types = map_types
+        self.just_generate = just_generate
+        self.jar_file = jar_file
+        self.generated_code_dir = generated_code_dir
         self.current_try = current_try
-
 
 
 def sqoop_wiki(config):
@@ -306,28 +321,42 @@
     logging.info('STARTING: {}'.format(log_message))
     try:
         target_directory = (config.table_path_template + 
'/wiki_db={db}').format(
-                    table=config.table, db=config.dbname)
+            table=config.table, db=config.dbname)
+
+        query = config.query
+        command = 'import'
+        if config.just_generate:
+            query = query + ' and 1=0'
+            command = 'codegen'
+
         sqoop_arguments = [
             'sqoop',
-            'import',
+            command,
 
             '-D'                , "mapred.job.name='{}-{}'".format(
                 config.yarn_job_name_prefix, full_table),
             '--username'        , config.user,
             '--password-file'   , config.password_file,
             '--connect'         , config.jdbc_host + config.dbname + 
config.dbpostfix,
-            # NOTE: using columns/map/where doesn't let you properly decode 
varbinary
-            # '--table'           , table,
-            # '--columns'         , queries[table].get('columns') or '',
-            # '--where'           , queries[table].get('where') or '1=1',
             '--query'           , config.query,
-            '--num-mappers'     , config.num_mappers,
-            '--target-dir'      , target_directory,
-            '--as-avrodatafile' ,
+            '--outdir'          , config.generated_code_dir,
+            '--bindir'          , config.generated_code_dir,
         ]
 
         if config.map_types:
             sqoop_arguments += ['--map-column-java', config.map_types]
+
+        if config.just_generate or config.jar_file:
+            # When just_generate, we need to create the class, so we name it
+            # When jar_file, we need to use the class, so we use the same name
+            sqoop_arguments += ['--class-name', config.table]
+
+        if not config.just_generate:
+            sqoop_arguments += [
+                '--target-dir'      , target_directory,
+                '--num-mappers'     , config.num_mappers,
+                '--as-avrodatafile' ,
+            ]
 
         logging.info('Sqooping with: {}'.format(sqoop_arguments))
         check_call(sqoop_arguments)
@@ -373,12 +402,20 @@
     num_processors                      = int(arguments.get('--processors')) 
if arguments.get('--processors') else None
     max_tries                           = int(arguments.get('--max-tries'))
     force                               = arguments.get('--force')
+    just_generate                       = arguments.get('--just-generate-jar')
+    jar_file                            = arguments.get('--jar-file')
 
     table_path_template = '{hdfs}/{table}/snapshot={snapshot}'.format(
         hdfs=target_hdfs_directory,
         table='{table}',
         snapshot=snapshot,
     )
+
+    if just_generate:
+        # When we're just generating, we want to write to the tmp directory
+        table_path_template = '/tmp/sqoop-temp-just-generate/{table}'
+        # deleting any previous temporary files
+        force = True
 
     log_level = logging.INFO
     if verbose:
@@ -404,42 +441,58 @@
     if not check_hdfs_path(table_path_template, force):
         sys.exit(1)
 
-    # read in the wikis to process and sqoop each one
-    with open(db_list_file) as dbs_file:
-        # Remove lines starting with dashes
-        flat_wikis = [row for row in csv.reader(dbs_file) if not 
row[0].startswith('#')]
+    generated_code_dir = mkstemp()[1]
+    if just_generate:
+        flat_wikis = [['enwiki', 1]]
+    else:
+        # read in the wikis to process and sqoop each one
+        with open(db_list_file) as dbs_file:
+            # Remove lines starting with dashes
+            flat_wikis = [row for row in csv.reader(dbs_file) if not 
row[0].startswith('#')]
 
-        failed_jobs = []
-        for group, wikis in groupby(flat_wikis, lambda w: w[1]):
-            executor_config_list = []
-            for w in wikis:
-                for table in queries.keys():
-                    query = queries[table].get('query')
-                    map_types = queries[table]['map-types'] if ('map-types' in 
queries[table]) else None
-                    
executor_config_list.append(SqoopConfig(yarn_job_name_prefix, user, 
password_file, jdbc_host, num_mappers,
-                                                table_path_template, w[0], 
dbpostfix, table, query, map_types, 1))
+    failed_jobs = []
+    for group, wikis in groupby(flat_wikis, lambda w: w[1]):
+        executor_config_list = []
+        for w in wikis:
+            for table in queries.keys():
+                query = queries[table].get('query')
+                map_types = queries[table]['map-types'] if ('map-types' in 
queries[table]) else None
+                executor_config_list.append(SqoopConfig(yarn_job_name_prefix, 
user, password_file, jdbc_host, num_mappers,
+                                            table_path_template, w[0], 
dbpostfix, table, query, map_types,
+                                            just_generate, jar_file, 
generated_code_dir, 1))
 
-            # sqoop all wikis in this group and wait for them all to finish 
with retry
-            with futures.ProcessPoolExecutor(num_processors) as executor:
-                current_try = 0
-                while (executor_config_list and current_try < max_tries):
-                    executor_config_list = filter(None, 
list(executor.map(sqoop_wiki, executor_config_list)))
-                    current_try += 1
-            failed_jobs.extend(executor_config_list)
+        # sqoop all wikis in this group and wait for them all to finish with 
retry
+        with futures.ProcessPoolExecutor(num_processors) as executor:
+            current_try = 0
+            while (executor_config_list and current_try < max_tries):
+                executor_config_list = filter(None, 
list(executor.map(sqoop_wiki, executor_config_list)))
+                current_try += 1
+        failed_jobs.extend(executor_config_list)
 
-        # if there were no failures at all, write a success flag to this 
dataset
-        if not failed_jobs:
+    # if there were no failures at all, write a success flag to this dataset
+    if not failed_jobs:
+        if just_generate:
+            # NOTE: if trying to compile generated .java files, this is the 
command:
+            # javac -classpath \
+            #   
/usr/lib/hadoop/*:/usr/lib/hadoop-0.20-mapreduce/*:/usr/lib/hadoop-hdfs/*:/usr/lib/sqoop/*
 \
+            #   <<generated-table-file>>.java
+            check_call([
+                'jar', 'cf', 'sqoop-mediawiki-tables.jar',
+                ' '.join([generated_code_dir + '/*class'])
+            ])
+            logging.info('Generated sqoop-mediawiki-tables.jar')
+        else:
             for table in queries.keys():
                 success_directory = table_path_template.format(table=table)
                 check_call([
                     'hdfs', 'dfs', '-touchz',
                     success_directory + '/_SUCCESS',
                 ])
-        else:
-            to_rerun = ','.join([ ])
-            logging.error('*' * 50)
-            logging.error('*  Jobs to re-run: {}'.format(to_rerun))
-            for c in failed_jobs:
-                logging.error('*    - {}.{}'.format(c.dbname, c.table))
-            logging.error('*' * 50)
-            sys.exit(1)
+    else:
+        to_rerun = ','.join(failed_jobs)
+        logging.error('*' * 50)
+        logging.error('*  Jobs to re-run: {}'.format(to_rerun))
+        for c in failed_jobs:
+            logging.error('*    - {}.{}'.format(c.dbname, c.table))
+        logging.error('*' * 50)
+        sys.exit(1)
diff --git a/setup.cfg b/setup.cfg
index 436cceb..f1a37e3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -6,5 +6,5 @@
 # E711: there are valid reasons to do column != None in SQL Alchemy
 ignore = F401,E221,E203,E711
 # line lengths should be limited but not to 80
-max-line-length = 90
+max-line-length = 140
 exclude = .venv,.tox,dist,doc,build,*.egg

-- 
To view, visit https://gerrit.wikimedia.org/r/349723
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I984ce686f65ac61802d07812a6f17dba9a92fe3e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Milimetric <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to