Hi, I am trying to use Spark Structured Streaming for the first time, and have a question about MicroBatchExecution/FileStreamSource. In particular, it doesn't seem to respect maxFilesPerTrigger for the first execution of a trigger. I'm not sure whether I'm doing something wrong or this is intentional.
pseudocode I'm running: val data = spark.readStream .option("maxFilesPerTrigger", "10") .textFile(path) data.writeStream .format("parquet") .option("checkpointLocation", checkpointPath) .option("path", tablePath) .trigger(Trigger.ProcessingTime(s"10 seconds")) .start() What I see in the logs is that while maxFilesPerBatch does get set to 10, the first batch nevertheless processes all 8184 files that exist at that time: 21/04/29 19:43:43 INFO FileStreamSource: maxFilesPerBatch = Some(10), maxFileAgeMs = 604800000 21/04/29 19:43:43 INFO MicroBatchExecution: Using Source [FileStreamSource[s3://.../.../2021/04/28/*]] from DataSourceV1 named 'FileSource[s3://.../.../2021/04/28/*]' [DataSource(org.apache.spark.sql.SparkSession@483828b3,text,List(),None,List(),None,Map(maxFilesPerTrigger -> 10, path -> s3://.../.../2021/04/28/*),None)] 21/04/29 19:43:43 INFO MicroBatchExecution: no commit log present 21/04/29 19:43:43 INFO MicroBatchExecution: Resuming at batch 0 with committed offsets {} and available offsets {FileStreamSource[s3://.../.../2021/04/28/*]: {"logOffset":0}} 21/04/29 19:43:43 INFO MicroBatchExecution: Stream started from {} 21/04/29 19:43:44 INFO FileStreamSource: Processing 8184 files from 0:0 In code, it looks like getOffset, which *is* effected by maxFilesPerBatch, is only called when there's an existing offset (https://github.com/apache/spark/blob/e8bf8fe213c0f66f6d32f845f4dc391fa5c530f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L402) 1. Am I just doing something incorrectly here, and there's some way to get the behavior I want? 2. If there isn't, here's the problem I'm trying to solve and how I hoped to solve it. Can anyone suggest any workarounds? The source is date/hour separated folders on S3. There are about 10k files per day, which come in irregular chunks, with data for about 10 years. I want to end up with slightly transformed data somewhere else on S3 with around a few minute latency. I was hoping to start a job once a day, looking at just that date's output. However, to do that I need to start after there is some data already on S3 at that path, and the files sometimes come in a large chunk at the start of the day (e.g. 100 files right away, then more like 10 files a minute). There seem to be some possible hacks like starting the job a lot earlier before there is any data to process, though this feels not great (e.g. I can't just rerun the same job later with the same resource management). Would be really grateful for any thoughts, thank you! -Dima Kamalov -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org