Hello,

Thank you very much for your replies!

I have done the same calculations πŸ˜ƒ
I could increase the task size now. It would work for another million of files, 
but I will hit this limit very soon again. My hope was that I missed a config 
property or a Streaming Query option, that changes this β€œcollect all” behavior.

Commits to Iceberg are way less frequent: the stream has β€œ1 hour” trigger time. 
And I also run optimization procedures regularly.

Best regards,
Anastasiia

From: Andrei L <andrewlopuk...@gmail.com>
Date: Monday, 12. May 2025 at 23:44
To: εˆ˜ε”― <z920631...@gmail.com>
Cc: Anastasiia Sokhova <anastasiia.sokh...@honic.eu.invalid>, 
user@spark.apache.org <user@spark.apache.org>
Subject: Re: Structured Streaming Initial Listing Issue
You don't often get email from andrewlopuk...@gmail.com. Learn why this is 
important<https://aka.ms/LearnAboutSenderIdentification>

Hello.



AFAIK the problem is not scoped to streaming and can't be mitigated with only 
maxRedultSize for such input. Spark has to bring all file paths into driver 
memory even in case of streaming 
(https://github.com/apache/spark/blob/37028fafc4f9fc873195a88f0840ab69edcf9d2b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L359)

I assume memory consumption is at least ~100 bytes per file for metadata (at 
least org.apache.hadoop.fs.FileStatus instance per file),

When there are 10^9 files in play, with naive approach you will expect an order 
of 100-1000Gb required driver memory just to list all files. Given provided 
ratio (1073 mb per 5.5 mln files) you may need 200Gb driver memory for 10^9 
files. If driver heap was less then 32 gb you can expect even worse ratio due 
to (un)compressed oops.

Possible solutions I can think of:

* Preprocessing jobs to merge json files/dump them to kafka.

* Data source modification (e.g. time based grouping).

* Sophisticated sliding window technique with manual sliding. Simplest one: 
process one partition (day) at a time.

* Custom input stream source.



Disclaimer: this answer is personal opinion based on my spark knowledge and not 
very deep source code analysis.

P.s. Iceberg files/metadata maintaince should also be considered with care for 
such frequent appends.



Sincerely, Andrei L.

ΠΏΠ½, 12 мая 2025 Π³., 21:26 εˆ˜ε”― 
<z920631...@gmail.com<mailto:z920631...@gmail.com>>:
That 1073.3 MiB isn't too much bigger than spark.driver.maxResultSize, can't 
you just increase that config with a larger number?

/ Wei

Anastasiia Sokhova <anastasiia.sokh...@honic.eu.invalid> 于2025εΉ΄4月16ζ—₯周三 03:37ε†™ι“οΌš
Dear Spark Community,

I run a Structured Streaming Query to read json files from S3 into an Iceberg 
table.  This is my query:

```python
stream_reader = (
    spark_session.readStream.format("json")
    .schema(schema)
    .option("maxFilesPerTrigger", 256_000)
    .option("basePath", f"s3a://test-bucket/root_dir/")
    .load(f"s3a://test-bucket/root_dir/2025/04/")
    .coalesce(8)
    .withColumn("object_key", input_file_name())
)
stream = (
    stream_reader.writeStream.queryName(f"test_stream")
    .format("iceberg")
    .outputMode("append")
    .option("checkpointLocation", 
f"s3a://ttest-bucket/checkpoints/{uuid.uuid4()}/")
    .trigger(processingTime="10 seconds")
    .toTable(target_table_full_name)
)
```

My data on S3 has this structure:
```
root_dir/
└── 2025/
    └── 04/
        β”œβ”€β”€ 15/
        β”‚   β”œβ”€β”€ 123e4567-e89b-12d3-a456-426614174003.json
        β”‚   └── 123e4567-e89b-12d3-a456-426614174004.json
        β”œβ”€β”€ 16/
        β”‚   β”œβ”€β”€ 123e4567-e89b-12d3-a456-426614174000.json
        β”‚   β”œβ”€β”€ 123e4567-e89b-12d3-a456-426614174001.json
        β”‚   └── 123e4567-e89b-12d3-a456-426614174002.json
        └── 17/
            β”œβ”€β”€ 123e4567-e89b-12d3-a456-426614174005.json
            └── 123e4567-e89b-12d3-a456-426614174006.json
```

These are millions of 1.5KB files.

I encounter issues with the initial listing: when I start the stream I see this 
log:
```
Total size of serialized results of 51 tasks (1073.3 MiB) is bigger than 
spark.driver.maxResultSize (1024.0 MiB)
```

51 is the number of sub directories I have in my test setup. It seems that 
Spark recognises sub directories as partitions, and does the listing per 
partition, but in the end still aggregates everything. This error happens for 
total 5.5 mln files.
Setting maxFilesPerTrigger does not help to limit this initial listing either.

Please, give me a hint in how to handle this initial listing for potentially 
billions of files.

My setup is a standalone Spark 3.5.1 cluster with Spark Connect.

Best regards,
Anastasiia

Reply via email to