Repository: incubator-hawq Updated Branches: refs/heads/master ef2aef879 -> 77e245dde
HAWQ-1035. Add yml parser to support partition table. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/77e245dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/77e245dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/77e245dd Branch: refs/heads/master Commit: 77e245ddeb3e619159eec90e4934eadc38ed259c Parents: ef2aef8 Author: hzhang2 <zhanghuan...@163.com> Authored: Sun Sep 18 16:32:33 2016 +0800 Committer: hzhang2 <zhanghuan...@163.com> Committed: Sun Sep 18 16:32:33 2016 +0800 ---------------------------------------------------------------------- tools/bin/hawqregister | 92 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 72 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77e245dd/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index ffae437..0c75662 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -39,7 +39,6 @@ logger = get_default_logger() EXECNAME = os.path.split(__file__)[-1] setup_tool_logging(EXECNAME, getLocalHostname(), getUserName()) - def option_parser(): '''option parser''' parser = OptParser(option_class=OptChecker, @@ -110,23 +109,6 @@ def register_yaml_dict_check(D): logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % 'AO_FileLocations.%s' % attr) sys.exit(1) - -def option_parser_yml(yml_file): - import yaml - with open(yml_file, 'r') as f: - params = yaml.load(f) - register_yaml_dict_check(params) - if params['FileFormat'].lower() == 'parquet': - if not len(params['Parquet_FileLocations']['Files']): - return 'Parquet', [], [], params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum'] - files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']], [d['size'] for d in params['Parquet_FileLocations']['Files']] - return 'Parquet', files, sizes, params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum'] - if not len(params['AO_FileLocations']['Files']): - return 'AO', [], [], params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum'] - files, sizes = [params['DFS_URL'] + d['path'] for d in params['AO_FileLocations']['Files']], [d['size'] for d in params['AO_FileLocations']['Files']] - return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum'] - - class GpRegisterAccessor(object): def __init__(self, conn): self.conn = conn @@ -297,9 +279,79 @@ class HawqRegister(object): if self.bucket_number != get_bucket_number(): logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename) sys.exit(1) - + + def set_yml_dataa(file_format, files, sizes, schema, distribution_policy, file_locations,\ + bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\ + partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes): + self.file_format = file_format + self.files = files + self.sizes = sizes + self.schema = schema + self.distribution_policy = distribution_policy + self.file_locations = file_locations + self.bucket_number = bucket_number + self.partitionby = partitionby + self.partitions_constraint = partitions_constraint + self.partitions_name = partitions_name + self.partitions_compression_level = partitions_compression_level + self.partitions_compression_type = partitions_compression_type + self.partitions_checksum = partitions_checksum + self.partitions_filepaths = partitions_filepaths + self.partitions_filesizes = partitions_filesizes + + def option_parser_yml(yml_file): + import yaml + with open(yml_file, 'r') as f: + params = yaml.load(f) + register_yaml_dict_check(params) + partitions_filepaths = [] + partitions_filesizes = [] + partitions_constraint = [] + partitions_name = [] + partitions_checksum = [] + partitions_compression_level = [] + partitions_compression_type = [] + files = [] + sizes = [] + + if params['FileFormat'].lower() == 'parquet': + partitionby = params.get('Parquet_FileLocations').get('PartitionBy') + if params.get('Parquet_FileLocations').get('Partitions') and len(params['Parquet_FileLocations']['Partitions']): + partitions_checksum = [d['Checksum'] for d in params['Parquet_FileLocations']['Partitions']] + partitions_compression_level = [d['CompressionLevel'] for d in params['Parquet_FileLocations']['Partitions']] + partitions_compression_type = [d['CompressionType'] for d in params['Parquet_FileLocations']['Partitions']] + partitions_constraint = [d['Constraint'] for d in params['Parquet_FileLocations']['Partitions']] + partitions_files = [d['Files'] for d in params['Parquet_FileLocations']['Partitions']] + if len(partitions_files): + for pfile in partitions_files: + partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile]) + partitions_filesizes.append([item['size'] for item in pfile]) + partitions_name = [d['Name'] for d in params['Parquet_FileLocations']['Partitions']] + if len(params['Parquet_FileLocations']['Files']): + files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']], [d['size'] for d in params['Parquet_FileLocations']['Files']] + set_yml_dataa('Parquet', files, sizes, params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum'], partitionby,\ + partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes) + + else: #AO format + partitionby = params.get('AO_FileLocations').get('PartitionBy') + if params.get('AO_FileLocations').get('Partitions') and len(params['AO_FileLocations']['Partitions']): + partitions_checksum = [d['Checksum'] for d in params['AO_FileLocations']['Partitions']] + partitions_compressionLevel = [d['CompressionLevel'] for d in params['AO_FileLocations']['Partitions']] + partitions_compressionType = [d['CompressionType'] for d in params['AO_FileLocations']['Partitions']] + partitions_constraint = [d['Constraint'] for d in params['AO_FileLocations']['Partitions']] + partitions_files = [d['Files'] for d in params['AO_FileLocations']['Partitions']] + if len(partitions_files): + for pfile in partitions_files: + partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile]) + partitions_filesizes.append([item['size'] for item in pfile]) + partitions_name = [d['Name'] for d in params['AO_FileLocations']['Partitions']] + if len(params['AO_FileLocations']['Files']): + files, sizes = [params['DFS_URL'] + d['path'] for d in params['AO_FileLocations']['Files']], [d['size'] for d in params['AO_FileLocations']['Files']] + set_yml_dataa('AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum'], partitionby, partitions_constraint,\ + partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes) + if self.yml: - self.file_format, self.files, self.sizes, self.schema, self.distribution_policy, self.file_locations, self.bucket_number = option_parser_yml(self.yml) + option_parser_yml(options.yml_config) self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else '' check_distribution_policy() if self.mode != 'force' and self.mode != 'repair':