Ottomata has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/349723 )

Change subject: Add --generate-jar and --jar-file options
......................................................................


Add --generate-jar and --jar-file options

This change adds the option to generate a jar file containing all the
ORM java code that sqoop would otherwise have to generate by hitting the
source.  It also adds the option to use that generated file.  It also
adds the jar that was generated during testing to artifacts/

Open Question: sqoop is accepting the --jar-file parameter and uses it:

INFO tool.CodeGenTool: Using existing jar: ...jar

However, it's still executing a SQL query and generating the avsc file
for each table.  I wonder if this is just unavoidable.

Deployment plan:
* (DONE) generate the jar, commit and merge to this repo
* (DONE) test running the sqoop with a --jar-file
* change the sqoop cron in puppet to use the new jar

NOTE: to compile these ORM .java files manually, the command is:
javac -classpath \
 /usr/lib/hadoop/* \
:/usr/lib/hadoop-0.20-mapreduce/* \
:/usr/lib/hadoop-hdfs/* \
:/usr/lib/sqoop/* \
  <<pattern-for-all-your-table-ORM-files>>.java

Bug: T143119
Change-Id: I984ce686f65ac61802d07812a6f17dba9a92fe3e
---
A artifacts/mediawiki-tables-sqoop-orm.jar
M bin/sqoop-mediawiki-tables
M setup.cfg
3 files changed, 100 insertions(+), 51 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved
  Joal: Looks good to me, but someone else must approve



diff --git a/artifacts/mediawiki-tables-sqoop-orm.jar 
b/artifacts/mediawiki-tables-sqoop-orm.jar
new file mode 100644
index 0000000..0e227df
--- /dev/null
+++ b/artifacts/mediawiki-tables-sqoop-orm.jar
@@ -0,0 +1 @@
+#$# git-fat f17fd1e2705c0c5fec37fc92d9a05f302dd9fbf4               202764
diff --git a/bin/sqoop-mediawiki-tables b/bin/sqoop-mediawiki-tables
index 200523d..a2571ae 100755
--- a/bin/sqoop-mediawiki-tables
+++ b/bin/sqoop-mediawiki-tables
@@ -33,7 +33,8 @@
           [--verbose] --wiki-file WIKIS --timestamp TIME
           [--max-tries TRIES] [--force] --snapshot SNAPSHOT
           [--mappers NUM] [--processors NUM] --user NAME
-          [--job-name HADOOP_JOB_NAME] [--labsdb] --password-file FILE
+          [--job-name JOB_NAME] [--labsdb] --password-file FILE
+          [--generate-jar JAR_OUT|--jar-file JAR_IN]
 
 Options:
     -h --help                           Show this help message and exit.
@@ -72,11 +73,22 @@
     -l --labsdb                         Add '_p' postfix to table names for 
labsdb
     -f --force                          Deleting existing folders before 
importing
                                         instead of failing
+    -g JAR_OUT --generate-jar JAR_OUT   Instead of running the job, just pick 
one wiki
+                                        and generate the java classes for each 
table,
+                                        then bundle them into a JAR for later 
use. Save
+                                        the generated jar to JAR_OUT, in a 
file named
+                                        JAR_OUT/mediawiki-tables-sqoop-orm.jar
+                                        NOTE: etwiki will be used to generate 
ORM classes
+    -r JAR_IN --jar-file JAR_IN         Disable code generation and use a jar 
file
+                                        with pre-compiled ORM classes.  The 
class names
+                                        will be convention-based and assumed 
to be the
+                                        same as running this script with -g
 """
 __author__ = 'Dan Andreesu <[email protected]>'
 
 import csv
 import logging
+import os
 import sys
 
 from docopt import docopt
@@ -84,18 +96,17 @@
 from itertools import groupby
 from subprocess import check_call
 from traceback import format_exc
+from tempfile import mkstemp
 
 from refinery.util import is_yarn_application_running, HdfsUtils
 
 queries = {}
 
-# NOTE: labsdb excludes fields according to:
-# 
https://github.com/wikimedia/operations-software-redactatron/blob/master/scripts/cols.txt
-# But it's more complicated, rows are also redacted, some reading and 
background:
-# https://wikitech.wikimedia.org/wiki/MariaDB/Sanitarium_and_Labsdbs
-# https://phabricator.wikimedia.org/T103011#2296587
-# https://phabricator.wikimedia.org/T138450
-# TODO: follow up with labs and DBAs to figure out how to redact this load
+print('************ NOTE ************')
+print('When sqooping from labs, resulting data will be shareable with the 
public '
+      'but when sqooping from production, resulting data may need to be 
redacted or '
+      'otherwise anonymized before sharing.')
+print('^^^^^^^^^^^^ NOTE ^^^^^^^^^^^^')
 
 
 def populate_queries(timestamp, labsdb):
@@ -274,7 +285,10 @@
 class SqoopConfig:
 
     def __init__(self, yarn_job_name_prefix, user, password_file, jdbc_host, 
num_mappers,
-                 table_path_template, dbname, dbpostfix, table, query, 
map_types, current_try):
+                 table_path_template, dbname, dbpostfix, table, query, 
map_types,
+                 generate_jar, jar_file,
+                 current_try):
+
         self.yarn_job_name_prefix = yarn_job_name_prefix
         self.user = user
         self.password_file = password_file
@@ -286,8 +300,9 @@
         self.table = table
         self.query = query
         self.map_types = map_types
+        self.generate_jar = generate_jar
+        self.jar_file = jar_file
         self.current_try = current_try
-
 
 
 def sqoop_wiki(config):
@@ -306,28 +321,48 @@
     logging.info('STARTING: {}'.format(log_message))
     try:
         target_directory = (config.table_path_template + 
'/wiki_db={db}').format(
-                    table=config.table, db=config.dbname)
+            table=config.table, db=config.dbname)
+
+        query = config.query
+        command = 'import'
+        if config.generate_jar:
+            query = query + ' and 1=0'
+            command = 'codegen'
+
         sqoop_arguments = [
             'sqoop',
-            'import',
-
+            command,
             '-D'                , "mapred.job.name='{}-{}'".format(
                 config.yarn_job_name_prefix, full_table),
             '--username'        , config.user,
             '--password-file'   , config.password_file,
             '--connect'         , config.jdbc_host + config.dbname + 
config.dbpostfix,
-            # NOTE: using columns/map/where doesn't let you properly decode 
varbinary
-            # '--table'           , table,
-            # '--columns'         , queries[table].get('columns') or '',
-            # '--where'           , queries[table].get('where') or '1=1',
             '--query'           , config.query,
-            '--num-mappers'     , config.num_mappers,
-            '--target-dir'      , target_directory,
-            '--as-avrodatafile' ,
         ]
 
+        if config.generate_jar:
+            sqoop_arguments += [
+                '--class-name'      , config.table,
+                '--outdir'          , config.generate_jar,
+                '--bindir'          , config.generate_jar,
+            ]
+        else:
+            sqoop_arguments += [
+                '--target-dir'      , target_directory,
+                '--num-mappers'     , config.num_mappers,
+                '--as-avrodatafile' ,
+            ]
+
+        if config.jar_file:
+            sqoop_arguments += [
+                '--class-name'      , config.table,
+                '--jar-file'        , config.jar_file,
+            ]
+
         if config.map_types:
-            sqoop_arguments += ['--map-column-java', config.map_types]
+            sqoop_arguments += [
+                '--map-column-java' , config.map_types
+            ]
 
         logging.info('Sqooping with: {}'.format(sqoop_arguments))
         check_call(sqoop_arguments)
@@ -373,6 +408,8 @@
     num_processors                      = int(arguments.get('--processors')) 
if arguments.get('--processors') else None
     max_tries                           = int(arguments.get('--max-tries'))
     force                               = arguments.get('--force')
+    generate_jar                        = arguments.get('--generate-jar')
+    jar_file                            = arguments.get('--jar-file')
 
     table_path_template = '{hdfs}/{table}/snapshot={snapshot}'.format(
         hdfs=target_hdfs_directory,
@@ -404,31 +441,37 @@
     if not check_hdfs_path(table_path_template, force):
         sys.exit(1)
 
-    # read in the wikis to process and sqoop each one
-    with open(db_list_file) as dbs_file:
-        # Remove lines starting with dashes
-        flat_wikis = [row for row in csv.reader(dbs_file) if not 
row[0].startswith('#')]
+    if generate_jar:
+        flat_wikis = [['etwiki', 1]]
+        jar_path = os.path.join(generate_jar, 'mediawiki-tables-sqoop-orm.jar')
+    else:
+        # read in the wikis to process and sqoop each one
+        with open(db_list_file) as dbs_file:
+            # Remove lines starting with dashes
+            flat_wikis = [row for row in csv.reader(dbs_file) if not 
row[0].startswith('#')]
 
-        failed_jobs = []
-        for group, wikis in groupby(flat_wikis, lambda w: w[1]):
-            executor_config_list = []
-            for w in wikis:
-                for table in queries.keys():
-                    query = queries[table].get('query')
-                    map_types = queries[table]['map-types'] if ('map-types' in 
queries[table]) else None
-                    
executor_config_list.append(SqoopConfig(yarn_job_name_prefix, user, 
password_file, jdbc_host, num_mappers,
-                                                table_path_template, w[0], 
dbpostfix, table, query, map_types, 1))
+    failed_jobs = []
+    for group, wikis in groupby(flat_wikis, lambda w: w[1]):
+        executor_config_list = []
+        for w in wikis:
+            for table in queries.keys():
+                query = queries[table].get('query')
+                map_types = queries[table]['map-types'] if ('map-types' in 
queries[table]) else None
+                executor_config_list.append(SqoopConfig(yarn_job_name_prefix, 
user, password_file, jdbc_host, num_mappers,
+                                            table_path_template, w[0], 
dbpostfix, table, query, map_types,
+                                            generate_jar, jar_file, 1))
 
-            # sqoop all wikis in this group and wait for them all to finish 
with retry
-            with futures.ProcessPoolExecutor(num_processors) as executor:
-                current_try = 0
-                while (executor_config_list and current_try < max_tries):
-                    executor_config_list = filter(None, 
list(executor.map(sqoop_wiki, executor_config_list)))
-                    current_try += 1
-            failed_jobs.extend(executor_config_list)
+        # sqoop all wikis in this group and wait for them all to finish with 
retry
+        with futures.ProcessPoolExecutor(num_processors) as executor:
+            current_try = 0
+            while (executor_config_list and current_try < max_tries):
+                executor_config_list = filter(None, 
list(executor.map(sqoop_wiki, executor_config_list)))
+                current_try += 1
+        failed_jobs.extend(executor_config_list)
 
-        # if there were no failures at all, write a success flag to this 
dataset
-        if not failed_jobs:
+    # if there were no failures at all, write a success flag to this dataset
+    if not failed_jobs:
+        if not generate_jar:
             for table in queries.keys():
                 success_directory = table_path_template.format(table=table)
                 check_call([
@@ -436,10 +479,15 @@
                     success_directory + '/_SUCCESS',
                 ])
         else:
-            to_rerun = ','.join([ ])
-            logging.error('*' * 50)
-            logging.error('*  Jobs to re-run: {}'.format(to_rerun))
-            for c in failed_jobs:
-                logging.error('*    - {}.{}'.format(c.dbname, c.table))
-            logging.error('*' * 50)
-            sys.exit(1)
+            check_call([
+                'jar', 'cf', jar_path, '-C', generate_jar, '.'
+            ])
+            logging.info('Generated ORM jar at {}'.format(jar_path))
+    else:
+        to_rerun = ','.join(failed_jobs)
+        logging.error('*' * 50)
+        logging.error('*  Jobs to re-run: {}'.format(to_rerun))
+        for c in failed_jobs:
+            logging.error('*    - {}.{}'.format(c.dbname, c.table))
+        logging.error('*' * 50)
+        sys.exit(1)
diff --git a/setup.cfg b/setup.cfg
index 436cceb..f1a37e3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -6,5 +6,5 @@
 # E711: there are valid reasons to do column != None in SQL Alchemy
 ignore = F401,E221,E203,E711
 # line lengths should be limited but not to 80
-max-line-length = 90
+max-line-length = 140
 exclude = .venv,.tox,dist,doc,build,*.egg

-- 
To view, visit https://gerrit.wikimedia.org/r/349723
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I984ce686f65ac61802d07812a6f17dba9a92fe3e
Gerrit-PatchSet: 3
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Milimetric <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to