Milimetric has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/363735 )

Change subject: Implement sqooping with mappers > 1
......................................................................

Implement sqooping with mappers > 1

Bug: T169782
Change-Id: I4b6c496d77f855d5d6f2025297c94dcc39de3a37
---
M bin/sqoop-mediawiki-tables
1 file changed, 35 insertions(+), 5 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery 
refs/changes/35/363735/1

diff --git a/bin/sqoop-mediawiki-tables b/bin/sqoop-mediawiki-tables
index 176728e..1b30bac 100755
--- a/bin/sqoop-mediawiki-tables
+++ b/bin/sqoop-mediawiki-tables
@@ -146,6 +146,8 @@
             'ar_minor_edit=Boolean',
             'ar_deleted=Integer',
         ])),
+
+        'split-by': 'ar_id',
     }
 
     queries['ipblocks'] = {
@@ -181,6 +183,8 @@
             'ipb_block_email=Boolean',
             'ipb_allow_usertalk=Boolean',
         ])),
+
+        'split-by': 'ipb_id',
     }
 
     queries['logging'] = {
@@ -201,6 +205,8 @@
                from logging
               where $CONDITIONS
         ''',
+
+        'split-by': 'log_id',
     }
 
     queries['page'] = {
@@ -225,6 +231,8 @@
             'page_is_redirect=Boolean',
             'page_is_new=Boolean',
         ])),
+
+        'split-by': 'page_id',
     }
 
     queries['pagelinks'] = {
@@ -237,6 +245,8 @@
                from pagelinks
               where $CONDITIONS
         ''',
+
+        'split-by': 'pl_from',
     }
 
     queries['redirect'] = {
@@ -250,6 +260,8 @@
                from redirect
               where $CONDITIONS
         ''',
+
+        'split-by': 'rd_from',
     }
 
     queries['revision'] = {
@@ -277,6 +289,8 @@
             'rev_minor_edit=Boolean',
             'rev_deleted=Integer',
         ])),
+
+        'split-by': 'rev_id',
     }
 
     queries['user'] = {
@@ -294,6 +308,8 @@
                from user
               where $CONDITIONS
         ''',
+
+        'split-by': 'user_id',
     }
 
     queries['user_groups'] = {
@@ -304,13 +320,16 @@
                from user_groups
               where $CONDITIONS
         ''',
+
+        'split-by': 'ug_user',
     }
 
 
 class SqoopConfig:
 
     def __init__(self, yarn_job_name_prefix, user, password_file, jdbc_host, 
num_mappers,
-                 table_path_template, dbname, dbpostfix, table, query, 
map_types,
+                 table_path_template, dbname, dbpostfix, table,
+                 query, split_by, map_types,
                  generate_jar, jar_file,
                  current_try):
 
@@ -318,12 +337,13 @@
         self.user = user
         self.password_file = password_file
         self.jdbc_host = jdbc_host
-        self.num_mappers, = num_mappers
+        self.num_mappers = num_mappers
         self.table_path_template = table_path_template
         self.dbname = dbname
         self.dbpostfix = dbpostfix
         self.table = table
         self.query = query
+        self.split_by = split_by
         self.map_types = map_types
         self.generate_jar = generate_jar
         self.jar_file = jar_file
@@ -341,6 +361,10 @@
         True if the sqoop worked
         False if the sqoop errored or failed in any way
     """
+    # debug
+    if config.table != 'revision':
+        return None
+
     full_table = '.'.join([config.dbname, config.table])
     log_message = '{} (try {})'.format(full_table, config.current_try)
     logging.info('STARTING: {}'.format(log_message))
@@ -374,9 +398,13 @@
         else:
             sqoop_arguments += [
                 '--target-dir'      , target_directory,
-                '--num-mappers'     , config.num_mappers,
+                '--num-mappers'     , str(config.num_mappers),
                 '--as-avrodatafile' ,
             ]
+            if config.num_mappers > 1:
+                sqoop_arguments += [
+                    '--split-by'    , config.split_by,
+                ]
 
         if config.jar_file:
             sqoop_arguments += [
@@ -435,7 +463,7 @@
     snapshot                            = arguments.get('--snapshot')
     user                                = arguments.get('--user')
     password_file                       = arguments.get('--password-file')
-    num_mappers                         = arguments.get('--mappers') or '1'
+    num_mappers                         = int(arguments.get('--mappers') or 
'1')
     num_processors                      = int(arguments.get('--processors')) 
if arguments.get('--processors') else None
     max_tries                           = int(arguments.get('--max-tries'))
     force                               = arguments.get('--force')
@@ -488,8 +516,10 @@
             for table in queries.keys():
                 query = queries[table].get('query')
                 map_types = queries[table]['map-types'] if ('map-types' in 
queries[table]) else None
+                split_by = queries[table]['split-by']
                 executor_config_list.append(SqoopConfig(yarn_job_name_prefix, 
user, password_file, jdbc_host, num_mappers,
-                                            table_path_template, w[0], 
dbpostfix, table, query, map_types,
+                                            table_path_template, w[0], 
dbpostfix, table,
+                                            query, split_by, map_types,
                                             generate_jar, jar_file, 1))
 
         # sqoop all wikis in this group and wait for them all to finish with 
retry

-- 
To view, visit https://gerrit.wikimedia.org/r/363735
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4b6c496d77f855d5d6f2025297c94dcc39de3a37
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Milimetric <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to