Milimetric has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/363735 )
Change subject: Implement sqooping with mappers > 1
......................................................................
Implement sqooping with mappers > 1
Bug: T169782
Change-Id: I4b6c496d77f855d5d6f2025297c94dcc39de3a37
---
M bin/sqoop-mediawiki-tables
1 file changed, 54 insertions(+), 9 deletions(-)
Approvals:
Mforns: Looks good to me, approved
Milimetric: Verified
diff --git a/bin/sqoop-mediawiki-tables b/bin/sqoop-mediawiki-tables
index 176728e..4d5fa9d 100755
--- a/bin/sqoop-mediawiki-tables
+++ b/bin/sqoop-mediawiki-tables
@@ -60,9 +60,17 @@
-u NAME --user NAME mysql user to use
-p FILE --password-file FILE File with mysql password to use
- -m NUM --mappers NUM The number of mappers to use to sqoop
+ -m NUM --mappers NUM The number of mappers to use to sqoop
big tables
+ If the group of wikis from --wiki-file
is larger
+ than 3 wikis, this number will get
changed to 1
+ regardless of what is passed in, to
prevent too
+ many parallel connections to the
source database
[optional] default is 1
-k NUM --processors NUM The number of parallel processors
sqooping
+ If the group of wikis from --wiki-file
is smaller
+ than 3 wikis, this number will be
changed to 1
+ regardless of what is passed in, to
prevent too
+ many parallel sqoop jobs on a big wiki
[optional] default is the number of
processors on the machine
-j JOB_NAME --job-name JOB_NAME The yarn job name prefix, only one job
with
@@ -146,6 +154,8 @@
'ar_minor_edit=Boolean',
'ar_deleted=Integer',
])),
+
+ 'split-by': 'ar_id',
}
queries['ipblocks'] = {
@@ -181,6 +191,8 @@
'ipb_block_email=Boolean',
'ipb_allow_usertalk=Boolean',
])),
+
+ 'split-by': 'ipb_id',
}
queries['logging'] = {
@@ -201,6 +213,8 @@
from logging
where $CONDITIONS
''',
+
+ 'split-by': 'log_id',
}
queries['page'] = {
@@ -225,6 +239,8 @@
'page_is_redirect=Boolean',
'page_is_new=Boolean',
])),
+
+ 'split-by': 'page_id',
}
queries['pagelinks'] = {
@@ -237,6 +253,8 @@
from pagelinks
where $CONDITIONS
''',
+
+ 'split-by': 'pl_from',
}
queries['redirect'] = {
@@ -250,6 +268,8 @@
from redirect
where $CONDITIONS
''',
+
+ 'split-by': 'rd_from',
}
queries['revision'] = {
@@ -277,6 +297,8 @@
'rev_minor_edit=Boolean',
'rev_deleted=Integer',
])),
+
+ 'split-by': 'rev_id',
}
queries['user'] = {
@@ -294,6 +316,8 @@
from user
where $CONDITIONS
''',
+
+ 'split-by': 'user_id',
}
queries['user_groups'] = {
@@ -304,13 +328,16 @@
from user_groups
where $CONDITIONS
''',
+
+ 'split-by': 'ug_user',
}
class SqoopConfig:
def __init__(self, yarn_job_name_prefix, user, password_file, jdbc_host,
num_mappers,
- table_path_template, dbname, dbpostfix, table, query,
map_types,
+ table_path_template, dbname, dbpostfix, table,
+ query, split_by, map_types,
generate_jar, jar_file,
current_try):
@@ -318,12 +345,13 @@
self.user = user
self.password_file = password_file
self.jdbc_host = jdbc_host
- self.num_mappers, = num_mappers
+ self.num_mappers = num_mappers
self.table_path_template = table_path_template
self.dbname = dbname
self.dbpostfix = dbpostfix
self.table = table
self.query = query
+ self.split_by = split_by
self.map_types = map_types
self.generate_jar = generate_jar
self.jar_file = jar_file
@@ -374,9 +402,13 @@
else:
sqoop_arguments += [
'--target-dir' , target_directory,
- '--num-mappers' , config.num_mappers,
+ '--num-mappers' , str(config.num_mappers),
'--as-avrodatafile' ,
]
+ if config.num_mappers > 1:
+ sqoop_arguments += [
+ '--split-by' , config.split_by,
+ ]
if config.jar_file:
sqoop_arguments += [
@@ -391,7 +423,7 @@
# Force deletion of possibly existing dir in case of retry
if config.current_try > 1:
- sqoop_arguments += [
+ sqoop_arguments += [
'--delete-target-dir'
]
@@ -435,7 +467,7 @@
snapshot = arguments.get('--snapshot')
user = arguments.get('--user')
password_file = arguments.get('--password-file')
- num_mappers = arguments.get('--mappers') or '1'
+ num_mappers = int(arguments.get('--mappers') or
'1')
num_processors = int(arguments.get('--processors'))
if arguments.get('--processors') else None
max_tries = int(arguments.get('--max-tries'))
force = arguments.get('--force')
@@ -484,16 +516,29 @@
failed_jobs = []
for group, wikis in groupby(flat_wikis, lambda w: w[1]):
executor_config_list = []
+
+ # If there are many wikis in a group set num_mappers to 1
+ # This is because you don't want to sqoop in parallel when the amount
of data is small
+ # Also, do the opposite with num_processors for the same reason
+ num_mappers_adjusted = num_mappers
+ num_processors_adjusted = 1
+ if len(wikis) > 3:
+ num_mappers_adjusted = 1
+ num_processors_adjusted = num_processors
+
for w in wikis:
for table in queries.keys():
query = queries[table].get('query')
map_types = queries[table]['map-types'] if ('map-types' in
queries[table]) else None
- executor_config_list.append(SqoopConfig(yarn_job_name_prefix,
user, password_file, jdbc_host, num_mappers,
- table_path_template, w[0],
dbpostfix, table, query, map_types,
+ split_by = queries[table]['split-by']
+ executor_config_list.append(SqoopConfig(yarn_job_name_prefix,
user, password_file, jdbc_host,
+ num_mappers_adjusted,
+ table_path_template, w[0],
dbpostfix, table,
+ query, split_by, map_types,
generate_jar, jar_file, 1))
# sqoop all wikis in this group and wait for them all to finish with
retry
- with futures.ProcessPoolExecutor(num_processors) as executor:
+ with futures.ProcessPoolExecutor(num_processors_adjusted) as executor:
current_try = 0
while (executor_config_list and current_try < max_tries):
executor_config_list = filter(None,
list(executor.map(sqoop_wiki, executor_config_list)))
--
To view, visit https://gerrit.wikimedia.org/r/363735
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I4b6c496d77f855d5d6f2025297c94dcc39de3a37
Gerrit-PatchSet: 3
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Milimetric <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits