Milimetric has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/334042 )

Change subject: Update sqoop script with labsdb specificity
......................................................................


Update sqoop script with labsdb specificity

Compared to production, wikis in labsdb are views instead of databases.
Their name reflect this change having a '_p' postfix.
This patch adds a --labsdb parameter to the sqoop script, adding the
needed postfix to database name when parameter is set and setting
empty strings as default values for ar_content_format and
ar_content_model fields of the archive table.

The patch also includes a new --processors parameter allowing
to set how concurrent sqoop jobs are launched (labsdb restricts
the number of parallel connection for a user to 10).

Bug: T155658
Change-Id: I91a4d0892bfe7ce5ef55035ffab3077845f79288
---
M bin/sqoop-mediawiki-tables
1 file changed, 29 insertions(+), 20 deletions(-)

Approvals:
  Milimetric: Verified; Looks good to me, approved
  Ottomata: Looks good to me, but someone else must approve



diff --git a/bin/sqoop-mediawiki-tables b/bin/sqoop-mediawiki-tables
index f3fe526..97d3939 100755
--- a/bin/sqoop-mediawiki-tables
+++ b/bin/sqoop-mediawiki-tables
@@ -31,8 +31,8 @@
 Usage:
   sqoop-mediawiki-tables --jdbc-host HOST --output-dir HDFS_PATH
           [--verbose] --wiki-file WIKIS --timestamp TIME
-          [--mappers NUM] --user NAME --password-file FILE
-          [--job-name HADOOP_JOB_NAME]
+          [--mappers NUM] [--processors NUM] --user NAME
+          [--job-name HADOOP_JOB_NAME] [--labsdb] --password-file FILE
 
 Options:
     -h --help                           Show this help message and exit.
@@ -59,9 +59,13 @@
 
     -m NUM --mappers NUM                The number of mappers to use to sqoop
                                         [optional] default is 1
+    -k NUM --processors NUM             The number of parallel processors 
sqooping
+                                        [optional] default is the number of
+                                        processors on the machine
     -j JOB_NAME --job-name JOB_NAME     The yarn job name, only one job of a 
certain
                                         name can run at a time.
                                         [optional] default is 
sqoop-mediawiki-tables
+    -l --labsdb                         Add '_p' postfix to table names for 
labsdb
 """
 __author__ = 'Dan Andreesu <[email protected]>'
 
@@ -89,7 +93,7 @@
 # TODO: follow up with labs and DBAs to figure out how to redact this load
 
 
-def populate_queries(timestamp):
+def populate_queries(timestamp, labsdb):
 
     # NOTES on queries:
     # convert(... using utf8) is used to decode varbinary fields into strings
@@ -115,12 +119,13 @@
                     ar_page_id,
                     ar_parent_id,
                     convert(ar_sha1 using utf8) ar_sha1,
-                    convert(ar_content_model using utf8) ar_content_model,
-                    convert(ar_content_format using utf8) ar_content_format
+                    convert({model} using utf8) ar_content_model,
+                    convert({format} using utf8) ar_content_format
 
                from archive
               where $CONDITIONS
-        ''',
+        '''.format(model="''" if labsdb else 'ar_content_model',
+                   format="''" if labsdb else 'ar_content_format'),
         'map-types': '"{}"'.format(','.join([
             'ar_minor_edit=Boolean',
             'ar_deleted=Boolean',
@@ -228,8 +233,7 @@
                 and rev_timestamp <= '{t}'
         '''.format(t=timestamp),
         'map-types': '"{}"'.format(','.join([
-            'rev_minor_edit=Boolean',
-            'rev_deleted=Boolean',
+            'rev_minor_edit=Boolean'
         ])),
     }
 
@@ -261,7 +265,7 @@
     }
 
 
-def sqoop_wiki(dbname):
+def sqoop_wiki(dbname_dbpostfix):
     """
     TODO: pass global values in a config object
     Imports a pre-determined list of tables from dbname
@@ -273,6 +277,7 @@
         True if the sqoop worked
         False if the sqoop errored or failed in any way
     """
+    dbname, dbpostfix = dbname_dbpostfix
     logging.info('STARTING: {db}'.format(db=dbname))
     try:
         for table in queries.keys():
@@ -288,7 +293,7 @@
                 '-D'                , 
"mapred.job.name='{}'".format(yarn_job_name),
                 '--username'        , user,
                 '--password-file'   , password_file,
-                '--connect'         , jdbc_host + dbname,
+                '--connect'         , jdbc_host + dbname + dbpostfix,
                 # NOTE: using columns/map/where doesn't let you properly 
decode varbinary
                 # '--table'           , table,
                 # '--columns'         , queries[table].get('columns') or '',
@@ -316,6 +321,7 @@
     # parse arguments
     arguments = docopt(__doc__)
     verbose                             = arguments.get('--verbose')
+    labsdb                              = arguments.get('--labsdb')
     yarn_job_name                       = arguments.get('--job-name')
 
     host                                = arguments.get('--jdbc-host')
@@ -325,13 +331,7 @@
     user                                = arguments.get('--user')
     password_file                       = arguments.get('--password-file')
     num_mappers                         = arguments.get('--mappers') or '1'
-
-    yarn_job_name = yarn_job_name or 'sqoop-mediawiki-tables'
-    if is_yarn_application_running(yarn_job_name):
-        logging.warn('{} is already running, exiting.'.format(yarn_job_name))
-        sys.exit(1)
-
-    jdbc_host = 'jdbc:mysql://' + host + '/'
+    num_processors                      = int(arguments.get('--processors')) 
if arguments.get('--processors') else None
 
     log_level = logging.INFO
     if verbose:
@@ -341,9 +341,18 @@
                         format='%(asctime)s %(levelname)-6s %(message)s',
                         datefmt='%Y-%m-%dT%H:%M:%S')
 
+    yarn_job_name = yarn_job_name or 'sqoop-mediawiki-tables'
+    if is_yarn_application_running(yarn_job_name):
+        logging.warn('{} is already running, exiting.'.format(yarn_job_name))
+        sys.exit(1)
+
+    jdbc_host = 'jdbc:mysql://' + host + '/'
+
+
     logging.info('Started Shell with with {}'.format(' '.join(arguments)))
 
-    populate_queries(timestamp)
+    populate_queries(timestamp, labsdb)
+    dbpostfix = '_p' if labsdb else ''
 
     # read in the wikis to process and sqoop each one
     with open(db_list_file) as dbs_file:
@@ -352,8 +361,8 @@
         groups_done = []
         for group, wikis in groupby(flat_wikis, lambda w: w[1]):
             # sqoop all wikis in this group and wait for them all to finish
-            with futures.ProcessPoolExecutor() as executor:
-                successes = executor.map(sqoop_wiki, [w[0] for w in wikis])
+            with futures.ProcessPoolExecutor(num_processors) as executor:
+                successes = executor.map(sqoop_wiki, [(w[0], dbpostfix) for w 
in wikis])
                 groups_done.append(all(successes) and any(successes))
 
         # if there were no failures at all, write a success flag to this 
dataset

-- 
To view, visit https://gerrit.wikimedia.org/r/334042
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I91a4d0892bfe7ce5ef55035ffab3077845f79288
Gerrit-PatchSet: 6
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to