RE: Small file problem

2021-06-17 Thread Boris Litvak
Compact them and remove the small files.
One messy way of doing this, (including some cleanup) looks like the following, 
based on rdd.mapPartitions() on the file urls rdd:

import gzip
import json
import logging
import math
import re
from typing import List

import boto3
from mypy_boto3_s3.client import S3Client
from pyspark.sql import SparkSession

import configuration
import data_types.time_series
from data_types.shared import series
from s3_batcher import get_s3_objects_batches
from functools import partial

logger = logging.getLogger(__name__)

session = boto3.session.Session()
s3 = session.resource('s3')

def merge_json_files(file_names, config):
total = []
exceptions = []
for name in file_names:
try:
logger.debug(f'Loading {name} ...')
obj = s3.Object(config.get_non_concatenated_bucket_name(), name)
body = obj.get()['Body'].read()
if name.endswith('.gz'):
body = gzip.decompress(body)

company_id = re.search('company_id=(\S+)/', name).group(1)
clazz = config.get_schema_class()
loaded = json.loads(body)
obj: series.SchemaFixer = clazz(company_id=company_id, **loaded)
jason = obj.fix_value()

total.append(jason)
except Exception as ex:
logger.error(f'{name}: {ex}')
exceptions.append(name)

if exceptions:
logger.warning(f'Exceptions: {exceptions}')
return iter(total)

def get_json_files_list(s3client: S3Client, config: configuration.Config) -> 
List[str]:
"""
returns [{'Key': '$s3prefix'}, ]
"""
logger.info('Loading file list')
files = []
# aws s3 ls --summarize --human-readable --recursive \
#   s3://ingest-measurements-202006101309296531/TIME_SERIES/ 
--profile hierarchy_playground > list.txt
for batch in get_s3_objects_batches(s3client,

Bucket=config.get_non_concatenated_bucket_name(),

Prefix=config.get_non_concatenated_prefix()):
files_batch = [b['Key'] for b in batch if '=' in b['Key']]
# 
TIME_SERIES/company_id=00224d27-b66f-4b62-bae2-f1399f530d94/60514332-0bc0-4bff-8263-eb6b090b9210.json.gz
files.extend(files_batch)

logger.info('Finished listing files')
return files

def run(spark: SparkSession, config: configuration.Config):
files = get_json_files_list(boto3.client('s3'), config)
files_num = len(files)
logger.info(f'Loaded file list with {files_num} files')

# logger.info(f'Traversing {files}')

spark.sparkContext.setJobDescription('Parallelize filenames and read/merge 
files')

rdd = spark.sparkContext.parallelize(files, math.ceil(files_num / 
config.small_files_in_partition))
logger.info(f'Got an rdd with {rdd.getNumPartitions()} partitions')
func = partial(merge_json_files, config=config)
loaded_rdd = rdd.mapPartitions(func)

# destination = r'c:\tmp\jsonresult'
# shutil.rmtree(destination)
# print(loaded_rdd.take(2))
loaded_rdd.saveAsTextFile(config.get_concatenated_path())
# note: these are not sorted by time, will be hard to etl/read
# result.write.json(config.get_concatenated_path())

# df = load_json_df(spark, config.source_path, 
config.source_partition_keys, config.input_schema)
# logger.info(f'Schema is {df.schema.json()}')
# spark.read.json(destination).show(truncate=50)
pass
# logger.info(f'Read 
{spark.read.format("parquet").load(config.get_parquet_dir()).count()} rows from 
parquet')


Boris

From: Sachit Murarka 
Sent: Wednesday, 16 June 2021 21:25
To: spark users 
Subject: Small file problem

Hello Spark Users,

We are receiving too much small small files. About 3 million. Reading it using 
spark.read itself taking long time and job is not proceeding further.

Is there any way to fasten this and proceed?

Regards
Sachit Murarka


Small file problem

2021-06-16 Thread Sachit Murarka
Hello Spark Users,

We are receiving too much small small files. About 3 million. Reading it
using spark.read itself taking long time and job is not proceeding further.

Is there any way to fasten this and proceed?

Regards
Sachit Murarka


How to handle small file problem in spark structured streaming?

2019-06-10 Thread Shyam P
https://stackoverflow.com/questions/56524539/how-to-handle-small-file-problem-in-spark-structured-streaming


Regards,
Shyam