Hi, I am trying to use Spark Structured Streaming for the first time, and
have a question about MicroBatchExecution/FileStreamSource.  In particular,
it doesn't seem to respect maxFilesPerTrigger for the first execution of a
trigger.  I'm not sure whether I'm doing something wrong or this is
intentional.

pseudocode I'm running:

val data = spark.readStream
      .option("maxFilesPerTrigger", "10")
      .textFile(path)

data.writeStream
      .format("parquet")
      .option("checkpointLocation", checkpointPath)
      .option("path", tablePath)
      .trigger(Trigger.ProcessingTime(s"10 seconds"))
      .start()

What I see in the logs is that while maxFilesPerBatch does get set to 10,
the first batch nevertheless processes all 8184 files that exist at that
time:

21/04/29 19:43:43 INFO FileStreamSource: maxFilesPerBatch = Some(10),
maxFileAgeMs = 604800000
21/04/29 19:43:43 INFO MicroBatchExecution: Using Source
[FileStreamSource[s3://.../.../2021/04/28/*]] from DataSourceV1 named
'FileSource[s3://.../.../2021/04/28/*]'
[DataSource(org.apache.spark.sql.SparkSession@483828b3,text,List(),None,List(),None,Map(maxFilesPerTrigger
-> 10, path -> s3://.../.../2021/04/28/*),None)]
21/04/29 19:43:43 INFO MicroBatchExecution: no commit log present
21/04/29 19:43:43 INFO MicroBatchExecution: Resuming at batch 0 with
committed offsets {} and available offsets
{FileStreamSource[s3://.../.../2021/04/28/*]: {"logOffset":0}}
21/04/29 19:43:43 INFO MicroBatchExecution: Stream started from {}
21/04/29 19:43:44 INFO FileStreamSource: Processing 8184 files from 0:0

In code, it looks like getOffset, which *is* effected by maxFilesPerBatch,
is only called when there's an existing offset
(https://github.com/apache/spark/blob/e8bf8fe213c0f66f6d32f845f4dc391fa5c530f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L402)

1. Am I just doing something incorrectly here, and there's some way to get
the behavior I want?

2. If there isn't, here's the problem I'm trying to solve and how I hoped to
solve it.  Can anyone suggest any workarounds?

The source is date/hour separated folders on S3.  There are about 10k files
per day, which come in irregular chunks, with data for about 10 years.  I
want to end up with slightly transformed data somewhere else on S3 with
around a few minute latency.

I was hoping to start a job once a day, looking at just that date's output. 
However, to do that I need to start after there is some data already on S3
at that path, and the files sometimes come in a large chunk at the start of
the day (e.g. 100 files right away, then more like 10 files a minute).

There seem to be some possible hacks like starting the job a lot earlier
before there is any data to process, though this feels not great (e.g. I
can't just rerun the same job later with the same resource management).

Would be really grateful for any thoughts, thank you!

-Dima Kamalov



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to