This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit d29fab1ad9a32c0200b71506c3b31f1ac8838e63 Author: Joe McDonnell <joemcdonn...@cloudera.com> AuthorDate: Wed Mar 31 19:26:57 2021 -0700 IMPALA-10629: Fix parquet compression codecs for data load scripts Currently, the dataload scripts don't respect non-standard compression codecs when loading Parquet data. It always loads snappy, even when specifying something else like --table_format=parquet/zstd. This fixes the dataload scripts so that they specify the compression_codec query option correctly and thus use the right codec when loading Parquet. For backwards compatibility, this preserves the behavior that parquet/none corresponds to the default compression codec (which is Snappy). This should make it easier to do performance testing on various Parquet codecs (like ZSTD). Testing: - Ran bin/load-data.py -w tpch --table_format=parquet/zstd and checked the codec in the file with the parquet-reader utility Change-Id: I1a346de3e5c4e38328e5a8ce8162697b7dd6553a Reviewed-on: http://gerrit.cloudera.org:8080/17259 Reviewed-by: Joe McDonnell <joemcdonn...@cloudera.com> Tested-by: Joe McDonnell <joemcdonn...@cloudera.com> --- testdata/bin/generate-schema-statements.py | 40 +++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py index a69fe9e..72c39f0 100755 --- a/testdata/bin/generate-schema-statements.py +++ b/testdata/bin/generate-schema-statements.py @@ -151,9 +151,22 @@ AVRO_SCHEMA_DIR = "avro_schemas" DEFAULT_FS=os.environ['DEFAULT_FS'] IMPALA_SUPPORTED_INSERT_FORMATS = ['parquet', 'hbase', 'text', 'kudu'] +IMPALA_PARQUET_COMPRESSION_MAP = \ + { + 'uncompressed': 'NONE', + # UGLY: parquet/none always referred to the default compression, which is SNAPPY + # Maintain that for backwards compatibility. + 'none': 'SNAPPY', + 'snap': 'SNAPPY', + 'gzip': 'GZIP', + 'lz4': 'LZ4', + 'zstd': 'ZSTD' + } + COMPRESSION_TYPE = "SET mapred.output.compression.type=%s;" COMPRESSION_ENABLED = "SET hive.exec.compress.output=%s;" COMPRESSION_CODEC = "SET mapred.output.compression.codec=%s;" +IMPALA_COMPRESSION_CODEC = "SET compression_codec=%s;" AVRO_COMPRESSION_CODEC = "SET avro.output.codec=%s;" SET_DYNAMIC_PARTITION_STATEMENT = "SET hive.exec.dynamic.partition=true;" SET_PARTITION_MODE_NONSTRICT_STATEMENT = "SET hive.exec.dynamic.partition.mode=nonstrict;" @@ -423,6 +436,11 @@ def build_codec_enabled_statement(codec): compression_enabled = 'false' if codec == 'none' else 'true' return COMPRESSION_ENABLED % compression_enabled + +def build_impala_parquet_codec_statement(codec): + parquet_codec = IMPALA_PARQUET_COMPRESSION_MAP[codec] + return IMPALA_COMPRESSION_CODEC % parquet_codec + def build_insert_into_statement(insert, db_name, db_suffix, table_name, file_format, hdfs_path, for_impala=False): insert_hint = "/* +shuffle, clustered */" \ @@ -464,15 +482,24 @@ def build_hbase_insert(db_name, db_suffix, table_name): return hbase_insert def build_insert(insert, db_name, db_suffix, file_format, - codec, compression_type, table_name, hdfs_path, create_hive=False): + codec, compression_type, table_name, hdfs_path, create_hive=False, + for_impala=False): # HBASE inserts don't need the hive options to be set, and don't require and HDFS # file location, so they're handled separately. if file_format == 'hbase' and not create_hive: return build_hbase_insert(db_name, db_suffix, table_name) - output = build_codec_enabled_statement(codec) + "\n" - output += build_compression_codec_statement(codec, compression_type, file_format) + "\n" + output = '' + if not for_impala: + # If this is not for Impala, then generate the hive codec statements + output += build_codec_enabled_statement(codec) + "\n" + output += build_compression_codec_statement(codec, compression_type, + file_format) + "\n" + elif file_format == 'parquet': + # This is for Impala parquet, add the appropriate codec statement + output += build_impala_parquet_codec_statement(codec) + "\n" output += build_insert_into_statement(insert, db_name, db_suffix, - table_name, file_format, hdfs_path) + "\n" + table_name, file_format, hdfs_path, + for_impala) + "\n" return output def build_load_statement(load_template, db_name, db_suffix, table_name): @@ -760,8 +787,9 @@ def generate_statements(output_name, test_vectors, sections, hive_output.load.append(build_insert(insert_hive, db_name, db_suffix, file_format, codec, compression_type, table_name, data_path)) elif insert: - impala_load.load.append(build_insert_into_statement(insert, db_name, - db_suffix, table_name, file_format, data_path, for_impala=True)) + impala_load.load.append(build_insert(insert, db_name, db_suffix, + file_format, codec, compression_type, table_name, data_path, + for_impala=True)) else: print 'Empty parquet/kudu load for table %s. Skipping insert generation' \ % table_name