Milimetric has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/363735 )

Change subject: Implement sqooping with mappers > 1
......................................................................


Implement sqooping with mappers > 1

Bug: T169782
Change-Id: I4b6c496d77f855d5d6f2025297c94dcc39de3a37
---
M bin/sqoop-mediawiki-tables
1 file changed, 54 insertions(+), 9 deletions(-)

Approvals:
  Mforns: Looks good to me, approved
  Milimetric: Verified



diff --git a/bin/sqoop-mediawiki-tables b/bin/sqoop-mediawiki-tables
index 176728e..4d5fa9d 100755
--- a/bin/sqoop-mediawiki-tables
+++ b/bin/sqoop-mediawiki-tables
@@ -60,9 +60,17 @@
     -u NAME --user NAME                 mysql user to use
     -p FILE --password-file FILE        File with mysql password to use
 
-    -m NUM --mappers NUM                The number of mappers to use to sqoop
+    -m NUM --mappers NUM                The number of mappers to use to sqoop 
big tables
+                                        If the group of wikis from --wiki-file 
is larger
+                                        than 3 wikis, this number will get 
changed to 1
+                                        regardless of what is passed in, to 
prevent too
+                                        many parallel connections to the 
source database
                                         [optional] default is 1
     -k NUM --processors NUM             The number of parallel processors 
sqooping
+                                        If the group of wikis from --wiki-file 
is smaller
+                                        than 3 wikis, this number will be 
changed to 1
+                                        regardless of what is passed in, to 
prevent too
+                                        many parallel sqoop jobs on a big wiki
                                         [optional] default is the number of
                                         processors on the machine
     -j JOB_NAME --job-name JOB_NAME     The yarn job name prefix, only one job 
with
@@ -146,6 +154,8 @@
             'ar_minor_edit=Boolean',
             'ar_deleted=Integer',
         ])),
+
+        'split-by': 'ar_id',
     }
 
     queries['ipblocks'] = {
@@ -181,6 +191,8 @@
             'ipb_block_email=Boolean',
             'ipb_allow_usertalk=Boolean',
         ])),
+
+        'split-by': 'ipb_id',
     }
 
     queries['logging'] = {
@@ -201,6 +213,8 @@
                from logging
               where $CONDITIONS
         ''',
+
+        'split-by': 'log_id',
     }
 
     queries['page'] = {
@@ -225,6 +239,8 @@
             'page_is_redirect=Boolean',
             'page_is_new=Boolean',
         ])),
+
+        'split-by': 'page_id',
     }
 
     queries['pagelinks'] = {
@@ -237,6 +253,8 @@
                from pagelinks
               where $CONDITIONS
         ''',
+
+        'split-by': 'pl_from',
     }
 
     queries['redirect'] = {
@@ -250,6 +268,8 @@
                from redirect
               where $CONDITIONS
         ''',
+
+        'split-by': 'rd_from',
     }
 
     queries['revision'] = {
@@ -277,6 +297,8 @@
             'rev_minor_edit=Boolean',
             'rev_deleted=Integer',
         ])),
+
+        'split-by': 'rev_id',
     }
 
     queries['user'] = {
@@ -294,6 +316,8 @@
                from user
               where $CONDITIONS
         ''',
+
+        'split-by': 'user_id',
     }
 
     queries['user_groups'] = {
@@ -304,13 +328,16 @@
                from user_groups
               where $CONDITIONS
         ''',
+
+        'split-by': 'ug_user',
     }
 
 
 class SqoopConfig:
 
     def __init__(self, yarn_job_name_prefix, user, password_file, jdbc_host, 
num_mappers,
-                 table_path_template, dbname, dbpostfix, table, query, 
map_types,
+                 table_path_template, dbname, dbpostfix, table,
+                 query, split_by, map_types,
                  generate_jar, jar_file,
                  current_try):
 
@@ -318,12 +345,13 @@
         self.user = user
         self.password_file = password_file
         self.jdbc_host = jdbc_host
-        self.num_mappers, = num_mappers
+        self.num_mappers = num_mappers
         self.table_path_template = table_path_template
         self.dbname = dbname
         self.dbpostfix = dbpostfix
         self.table = table
         self.query = query
+        self.split_by = split_by
         self.map_types = map_types
         self.generate_jar = generate_jar
         self.jar_file = jar_file
@@ -374,9 +402,13 @@
         else:
             sqoop_arguments += [
                 '--target-dir'      , target_directory,
-                '--num-mappers'     , config.num_mappers,
+                '--num-mappers'     , str(config.num_mappers),
                 '--as-avrodatafile' ,
             ]
+            if config.num_mappers > 1:
+                sqoop_arguments += [
+                    '--split-by'    , config.split_by,
+                ]
 
         if config.jar_file:
             sqoop_arguments += [
@@ -391,7 +423,7 @@
 
         # Force deletion of possibly existing dir in case of retry
         if config.current_try > 1:
-          sqoop_arguments += [
+            sqoop_arguments += [
                 '--delete-target-dir'
             ]
 
@@ -435,7 +467,7 @@
     snapshot                            = arguments.get('--snapshot')
     user                                = arguments.get('--user')
     password_file                       = arguments.get('--password-file')
-    num_mappers                         = arguments.get('--mappers') or '1'
+    num_mappers                         = int(arguments.get('--mappers') or 
'1')
     num_processors                      = int(arguments.get('--processors')) 
if arguments.get('--processors') else None
     max_tries                           = int(arguments.get('--max-tries'))
     force                               = arguments.get('--force')
@@ -484,16 +516,29 @@
     failed_jobs = []
     for group, wikis in groupby(flat_wikis, lambda w: w[1]):
         executor_config_list = []
+
+        # If there are many wikis in a group set num_mappers to 1
+        # This is because you don't want to sqoop in parallel when the amount 
of data is small
+        # Also, do the opposite with num_processors for the same reason
+        num_mappers_adjusted = num_mappers
+        num_processors_adjusted = 1
+        if len(wikis) > 3:
+            num_mappers_adjusted = 1
+            num_processors_adjusted = num_processors
+
         for w in wikis:
             for table in queries.keys():
                 query = queries[table].get('query')
                 map_types = queries[table]['map-types'] if ('map-types' in 
queries[table]) else None
-                executor_config_list.append(SqoopConfig(yarn_job_name_prefix, 
user, password_file, jdbc_host, num_mappers,
-                                            table_path_template, w[0], 
dbpostfix, table, query, map_types,
+                split_by = queries[table]['split-by']
+                executor_config_list.append(SqoopConfig(yarn_job_name_prefix, 
user, password_file, jdbc_host,
+                                            num_mappers_adjusted,
+                                            table_path_template, w[0], 
dbpostfix, table,
+                                            query, split_by, map_types,
                                             generate_jar, jar_file, 1))
 
         # sqoop all wikis in this group and wait for them all to finish with 
retry
-        with futures.ProcessPoolExecutor(num_processors) as executor:
+        with futures.ProcessPoolExecutor(num_processors_adjusted) as executor:
             current_try = 0
             while (executor_config_list and current_try < max_tries):
                 executor_config_list = filter(None, 
list(executor.map(sqoop_wiki, executor_config_list)))

-- 
To view, visit https://gerrit.wikimedia.org/r/363735
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I4b6c496d77f855d5d6f2025297c94dcc39de3a37
Gerrit-PatchSet: 3
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Milimetric <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to