Repository: incubator-hawq Updated Branches: refs/heads/master 21b867a10 -> 921b908ef
HAWQ-1024. Add rollback before all necessary exit. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/31363940 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/31363940 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/31363940 Branch: refs/heads/master Commit: 313639400b5a5bdf8afcb8c3123f141b580f238b Parents: 21b867a Author: xunzhang <xunzhang...@gmail.com> Authored: Fri Sep 23 11:23:58 2016 +0800 Committer: xunzhang <xunzhang...@gmail.com> Committed: Fri Sep 23 11:23:58 2016 +0800 ---------------------------------------------------------------------- tools/bin/hawqregister | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31363940/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 08041b4..570253f 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -136,6 +136,7 @@ class FailureHandler(object): return ' '.join(lst[:-2] + [lst[-1], lst[-2]]) def rollback(self): + logger.info('Error found, Hawqregister starts to rollback...') for (typ, cmd) in reversed(self.operations): if typ == 'SQL': sql = self.assemble_SQL(cmd) @@ -220,9 +221,11 @@ class GpRegisterAccessor(object): rows = self.exec_query(qry) if len(rows) == 0: logger.error('Table %s is not an append-only table. There is no record in gp_distribution_policy table.' % tablename) + self.failure_handler.rollback() sys.exit(1) if rows[0]['attrnums']: logger.error('Cannot register file(s) to a table which is hash distributed.') + self.failure_handler.rollback() sys.exit(1) # pg_paqseg_# @@ -233,11 +236,13 @@ class GpRegisterAccessor(object): rows = self.exec_query(query) if len(rows) == 0: logger.error('table "%s" not found in db "%s"' % (tablename, database)) + self.failure_handler.rollback() sys.exit(1) relname = rows[0]['relname'] if fmt == 'Parquet': if relname.find('paq') == -1: logger.error("table '%s' is not parquet format" % tablename) + self.failure_handler.rollback() sys.exit(1) return relname @@ -331,6 +336,7 @@ class HawqRegister(object): if self.distribution_policy.startswith('DISTRIBUTED BY'): if len(self.files) % self.bucket_number != 0: logger.error('Files to be registered must be multiple times to the bucket number of hash table.') + self.failure_handler.rollback() sys.exit(1) def check_database_encoding(): @@ -366,6 +372,7 @@ class HawqRegister(object): return else: logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.tablename) + self.failure_handler.rollback() sys.exit(1) tmp_dict = {} for i, d in enumerate(self.schema): @@ -375,6 +382,7 @@ class HawqRegister(object): original_policy = ','.join([str(tmp_dict[col]) for col in cols]) if policy.strip('{').strip('}') != original_policy: logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.tablename) + self.failure_handler.rollback() sys.exit(1) def check_bucket_number(): @@ -383,6 +391,7 @@ class HawqRegister(object): if self.bucket_number != get_bucket_number(): logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename) + self.failure_handler.rollback() sys.exit(1) def set_yml_dataa(file_format, files, sizes, tablename, schema, distribution_policy, file_locations,\ @@ -476,12 +485,14 @@ class HawqRegister(object): for sz in self.sizes: if sz < 0: logger.error('File size(%s) in yaml configuration file should not be less than 0.' % sz) + self.failure_handler.rollback() sys.exit(1) for k, fn in enumerate(self.files): hdfscmd = 'hdfs dfs -du %s' % fn _, out, _ = local_ssh_output(hdfscmd) if self.sizes[k] > int(out.strip().split()[0]): logger.error('File size(%s) in yaml configuration file should not exceed actual length(%s) of file %s.' % (self.sizes[k], out.strip().split()[0], fn)) + self.failure_handler.rollback() sys.exit(1) def check_no_regex_filepath(files): @@ -490,10 +501,12 @@ class HawqRegister(object): for v in tmp_lst: if v == '.': logger.error('Hawq register does not support file path with regex: %s.' % fn) + self.failure_handler.rollback() sys.exit(1) for ch in ['..', '*']: if fn.find(ch) != -1: logger.error('Hawq register does not support file path with regex: %s.' % fn) + self.failure_handler.rollback() sys.exit(1) if self.yml: @@ -524,6 +537,7 @@ class HawqRegister(object): if self.mode == 'repair': if self.tabledir.strip('/') != self.filepath.strip('/'): logger.error("In repair mode, file path from yaml file should be the same with table's path.") + self.failure_handler.rollback() sys.exit(1) existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath) existed_info = {} @@ -532,14 +546,17 @@ class HawqRegister(object): for k, fn in enumerate(self.files): if fn not in existed_files: logger.error('Can not register in repair mode since giving non-existing file: %s.' % fn) + self.failure_handler.rollback() sys.exit(1) if self.sizes[k] > existed_info[fn]: logger.error('Can not register in repair mode since giving larger file size: %s' % self.sizes[k]) + self.failure_handler.rollback() sys.exit(1) if self.mode == 'usage2_table_exist': if self.tabledir.strip('/') == self.filepath.strip('/'): logger.error('Files to be registered should not be the same with table path.') + self.failure_handler.rollback() sys.exit(1) self.do_not_move, self.files_update, self.sizes_update = False, [], [] @@ -548,12 +565,14 @@ class HawqRegister(object): if len(self.files) == len(existed_files): if sorted(self.files) != sorted(existed_files): logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.') + self.failure_handler.rollback() sys.exit(1) else: self.do_not_move, self.files_update, self.sizes_update = True, self.files, self.sizes self.files, self.sizes = [], [] elif len(self.files) < len(existed_files): logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.') + self.failure_handler.rollback() sys.exit(1) else: files_old, sizes_old = [f for f in self.files], [sz for sz in self.sizes] @@ -565,6 +584,7 @@ class HawqRegister(object): self.sizes.remove(sizes_old[k]) if sorted(self.files_update) != sorted(existed_files): logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.') + self.failure_handler.rollback() sys.exit(1) elif self.mode == 'repair': @@ -588,6 +608,7 @@ class HawqRegister(object): if self.filesize is not None: if len(self.files) != 1: logger.error('-e option is only supported with single file case.') + self.failure_handler.rollback() sys.exit(1) self.sizes = [self.filesize] check_sizes_valid() @@ -609,12 +630,14 @@ class HawqRegister(object): filesystem = filepath.split('://') if filesystem[0] != 'hdfs': logger.error('Only support registering file(s) in hdfs.') + self.failure_handler.rollback() sys.exit(1) fileroot = filepath.split('/') tableroot = tabledir.split('/') # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020' if fileroot[0:3] != tableroot[0:3]: logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'." % (filepath, tabledir)) + self.failure_handler.rollback() sys.exit(1) def _get_files_in_hdfs(self, filepath): @@ -624,6 +647,7 @@ class HawqRegister(object): result = local_ssh(hdfscmd, logger) if result != 0: logger.error("Path '%s' does not exist in hdfs" % filepath) + self.failure_handler.rollback() sys.exit(1) hdfscmd = "hdfs dfs -ls -R %s" % filepath result, out, err = local_ssh_output(hdfscmd) @@ -636,6 +660,7 @@ class HawqRegister(object): sizes.append(int(lineargs[4])) if len(files) == 0 and self.mode != 'force': logger.error("Dir '%s' is empty" % filepath) + self.failure_handler.rollback() sys.exit(1) return files, sizes @@ -652,6 +677,7 @@ class HawqRegister(object): result2 = local_ssh(hdfscmd, logger) if result1 or result2: logger.error('File %s is not parquet format' % f) + self.failure_handler.rollback() sys.exit(1) def _move_files_in_hdfs(self): @@ -678,6 +704,7 @@ class HawqRegister(object): result = local_ssh(hdfscmd, logger) if result != 0: logger.error('Fail to delete %s ' % fn) + self.failure_handler.rollback() sys.exit(1) def _modify_metadata(self, mode):