uncleGen commented on a change in pull request #24283: [SPARK-27355][SS] Make 
query execution more sensitive to epoch message late or lost
URL: https://github.com/apache/spark/pull/24283#discussion_r272902157
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ##########
 @@ -224,24 +227,24 @@ private[continuous] class EpochCoordinator(
         partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
       if (thisEpochOffsets.size == numReaderPartitions) {
         logDebug(s"Epoch $epoch has offsets reported from all partitions: 
$thisEpochOffsets")
-        query.addOffset(epoch, stream, thisEpochOffsets.toSeq)
+        queryExecution.addOffset(epoch, stream, thisEpochOffsets.toSeq)
         resolveCommitsAtEpoch(epoch)
       }
       checkProcessingQueueBoundaries()
   }
 
   private def checkProcessingQueueBoundaries() = {
-    if (partitionOffsets.size > epochBacklogQueueSize) {
-      query.stopInNewThread(new IllegalStateException("Size of the partition 
offset queue has " +
-        "exceeded its maximum"))
+    if (partitionOffsets.size > epochMsgBacklogQueueSize) {
 
 Review comment:
   Hmm... You remind me, we'd better to use a new config even though 
`CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE ` is a new introduced config in 
Spark 3.0.0-SNAPSHOT.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to