Compact them and remove the small files. One messy way of doing this, (including some cleanup) looks like the following, based on rdd.mapPartitions() on the file urls rdd:
import gzip import json import logging import math import re from typing import List import boto3 from mypy_boto3_s3.client import S3Client from pyspark.sql import SparkSession import configuration import data_types.time_series from data_types.shared import series from s3_batcher import get_s3_objects_batches from functools import partial logger = logging.getLogger(__name__) session = boto3.session.Session() s3 = session.resource('s3') def merge_json_files(file_names, config): total = [] exceptions = [] for name in file_names: try: logger.debug(f'Loading {name} ...') obj = s3.Object(config.get_non_concatenated_bucket_name(), name) body = obj.get()['Body'].read() if name.endswith('.gz'): body = gzip.decompress(body) company_id = re.search('company_id=(\S+)/', name).group(1) clazz = config.get_schema_class() loaded = json.loads(body) obj: series.SchemaFixer = clazz(company_id=company_id, **loaded) jason = obj.fix_value() total.append(jason) except Exception as ex: logger.error(f'{name}: {ex}') exceptions.append(name) if exceptions: logger.warning(f'Exceptions: {exceptions}') return iter(total) def get_json_files_list(s3client: S3Client, config: configuration.Config) -> List[str]: """ returns [{'Key': '$s3prefix'}, ] """ logger.info('Loading file list') files = [] # aws s3 ls --summarize --human-readable --recursive \ # s3://ingest-measurements-20200610130929653000000001/TIME_SERIES/ --profile hierarchy_playground > list.txt for batch in get_s3_objects_batches(s3client, Bucket=config.get_non_concatenated_bucket_name(), Prefix=config.get_non_concatenated_prefix()): files_batch = [b['Key'] for b in batch if '=' in b['Key']] # TIME_SERIES/company_id=00224d27-b66f-4b62-bae2-f1399f530d94/60514332-0bc0-4bff-8263-eb6b090b9210.json.gz files.extend(files_batch) logger.info('Finished listing files') return files def run(spark: SparkSession, config: configuration.Config): files = get_json_files_list(boto3.client('s3'), config) files_num = len(files) logger.info(f'Loaded file list with {files_num} files') # logger.info(f'Traversing {files}') spark.sparkContext.setJobDescription('Parallelize filenames and read/merge files') rdd = spark.sparkContext.parallelize(files, math.ceil(files_num / config.small_files_in_partition)) logger.info(f'Got an rdd with {rdd.getNumPartitions()} partitions') func = partial(merge_json_files, config=config) loaded_rdd = rdd.mapPartitions(func) # destination = r'c:\tmp\jsonresult' # shutil.rmtree(destination) # print(loaded_rdd.take(2)) loaded_rdd.saveAsTextFile(config.get_concatenated_path()) # note: these are not sorted by time, will be hard to etl/read # result.write.json(config.get_concatenated_path()) # df = load_json_df(spark, config.source_path, config.source_partition_keys, config.input_schema) # logger.info(f'Schema is {df.schema.json()}') # spark.read.json(destination).show(truncate=50) pass # logger.info(f'Read {spark.read.format("parquet").load(config.get_parquet_dir()).count()} rows from parquet') Boris From: Sachit Murarka <connectsac...@gmail.com> Sent: Wednesday, 16 June 2021 21:25 To: spark users <user@spark.apache.org> Subject: Small file problem Hello Spark Users, We are receiving too much small small files. About 3 million. Reading it using spark.read itself taking long time and job is not proceeding further. Is there any way to fasten this and proceed? Regards Sachit Murarka