Compact them and remove the small files.
One messy way of doing this, (including some cleanup) looks like the following,
based on rdd.mapPartitions() on the file urls rdd:
import gzip
import json
import logging
import math
import re
from typing import List
import boto3
from mypy_boto3_s3.client import S3Client
from pyspark.sql import SparkSession
import configuration
import data_types.time_series
from data_types.shared import series
from s3_batcher import get_s3_objects_batches
from functools import partial
logger = logging.getLogger(__name__)
session = boto3.session.Session()
s3 = session.resource('s3')
def merge_json_files(file_names, config):
total = []
exceptions = []
for name in file_names:
try:
logger.debug(f'Loading {name} ...')
obj = s3.Object(config.get_non_concatenated_bucket_name(), name)
body = obj.get()['Body'].read()
if name.endswith('.gz'):
body = gzip.decompress(body)
company_id = re.search('company_id=(\S+)/', name).group(1)
clazz = config.get_schema_class()
loaded = json.loads(body)
obj: series.SchemaFixer = clazz(company_id=company_id, **loaded)
jason = obj.fix_value()
total.append(jason)
except Exception as ex:
logger.error(f'{name}: {ex}')
exceptions.append(name)
if exceptions:
logger.warning(f'Exceptions: {exceptions}')
return iter(total)
def get_json_files_list(s3client: S3Client, config: configuration.Config) ->
List[str]:
"""
returns [{'Key': '$s3prefix'}, ]
"""
logger.info('Loading file list')
files = []
# aws s3 ls --summarize --human-readable --recursive \
# s3://ingest-measurements-202006101309296531/TIME_SERIES/
--profile hierarchy_playground > list.txt
for batch in get_s3_objects_batches(s3client,
Bucket=config.get_non_concatenated_bucket_name(),
Prefix=config.get_non_concatenated_prefix()):
files_batch = [b['Key'] for b in batch if '=' in b['Key']]
#
TIME_SERIES/company_id=00224d27-b66f-4b62-bae2-f1399f530d94/60514332-0bc0-4bff-8263-eb6b090b9210.json.gz
files.extend(files_batch)
logger.info('Finished listing files')
return files
def run(spark: SparkSession, config: configuration.Config):
files = get_json_files_list(boto3.client('s3'), config)
files_num = len(files)
logger.info(f'Loaded file list with {files_num} files')
# logger.info(f'Traversing {files}')
spark.sparkContext.setJobDescription('Parallelize filenames and read/merge
files')
rdd = spark.sparkContext.parallelize(files, math.ceil(files_num /
config.small_files_in_partition))
logger.info(f'Got an rdd with {rdd.getNumPartitions()} partitions')
func = partial(merge_json_files, config=config)
loaded_rdd = rdd.mapPartitions(func)
# destination = r'c:\tmp\jsonresult'
# shutil.rmtree(destination)
# print(loaded_rdd.take(2))
loaded_rdd.saveAsTextFile(config.get_concatenated_path())
# note: these are not sorted by time, will be hard to etl/read
# result.write.json(config.get_concatenated_path())
# df = load_json_df(spark, config.source_path,
config.source_partition_keys, config.input_schema)
# logger.info(f'Schema is {df.schema.json()}')
# spark.read.json(destination).show(truncate=50)
pass
# logger.info(f'Read
{spark.read.format("parquet").load(config.get_parquet_dir()).count()} rows from
parquet')
Boris
From: Sachit Murarka
Sent: Wednesday, 16 June 2021 21:25
To: spark users
Subject: Small file problem
Hello Spark Users,
We are receiving too much small small files. About 3 million. Reading it using
spark.read itself taking long time and job is not proceeding further.
Is there any way to fasten this and proceed?
Regards
Sachit Murarka