tdas opened a new pull request #23557: [SPARK-26629] [SS] Fixed error with 
multiple file stream in a query + restart on a batch that has no data for one 
file stream
URL: https://github.com/apache/spark/pull/23557
 
 
   ## What changes were proposed in this pull request?
   When a streaming query has multiple file streams, and there is a batch where 
one of the file streams dont have data in that batch, then if the query has to 
restart from that, it will throw the following error.
   ```
   java.lang.IllegalStateException: batch 1 doesn't exist
        at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$.verifyBatchIds(HDFSMetadataLog.scala:300)
        at 
org.apache.spark.sql.execution.streaming.FileStreamSourceLog.get(FileStreamSourceLog.scala:120)
        at 
org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:181)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:294)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:291)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at 
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:291)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:178)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:205)
   ```
   
   Existing `HDFSMetadata.verifyBatchIds` threw error whenever the `batchIds` 
list was empty. In the context of `FileStreamSource.getBatch` (where verify is 
called) and `FileStreamSourceLog` (subclass of `HDFSMetadata`), this is usually 
okay because, in a streaming query with one file stream, the `batchIds` can 
never be empty:
   - A batch is planned only when the `FileStreamSourceLog` has seen new offset 
(that is, there are new data files). 
   - So `FileStreamSource.getBatch` will be called on X to Y where X will 
always be > Y. This calls internally`HDFSMetadata.verifyBatchIds (X+1, Y)` with 
X+1-Y ids. 
   
   For example.,`FileStreamSource.getBatch(4, 5)` will call `verify(batchIds = 
Seq(5), start = 5, end = 5)`. However, the invariant of X > Y is not true when 
there are two file stream sources, as a batch may be planned even when only one 
of the file streams has data. So one of the file stream may not have data, 
which can call `FileStreamSource.getBatch(X, X)` -> `verify(batchIds = 
Seq.empty, start = X+1, end = X)` -> failure. 
   
   Note that `FileStreamSource.getBatch(X, X)` gets called **only when 
restarting a query in a batch where a file source did not have data**. This is 
because in normal planning of batches, `MicroBatchExecution` avoids calling 
`FileStreamSource.getBatch(X, X)` when offset X has not changed. However, when 
restarting a stream at such a batch, 
`MicroBatchExecution.populateStartOffsets()` calls 
`FileStreamSource.getBatch(X, X)` (DataSource V1 hack to initialize the source 
with last known offsets) thus hitting this issue.
   
   The minimum solution here is to skip verification when 
`FileStreamSource.getBatch(X, X)`. 
   
   
   ## How was this patch tested?
   
   (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
   (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
   
   Please review http://spark.apache.org/contributing.html before opening a 
pull request.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to