xuanyuanking commented on a change in pull request #27664:
URL: https://github.com/apache/spark/pull/27664#discussion_r425064230
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
##########
@@ -182,19 +182,34 @@ class HDFSMetadataLog[T <: AnyRef :
ClassTag](sparkSession: SparkSession, path:
}
}
- override def getLatest(): Option[(Long, T)] = {
+ /**
+ * Return the latest batch Id without reading the file. This method only
checks for existence of
+ * file to avoid cost on reading and deserializing log file.
+ */
+ def getLatestBatchId(): Option[Long] = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))
.sorted(Ordering.Long.reverse)
for (batchId <- batchIds) {
- val batch = get(batchId)
- if (batch.isDefined) {
- return Some((batchId, batch.get))
+ val batchMetadataFile = batchIdToPath(batchId)
+ if (fileManager.exists(batchMetadataFile)) {
+ return Some(batchId)
}
}
None
}
+ override def getLatest(): Option[(Long, T)] = {
+ getLatestBatchId().map { batchId =>
+ val content = get(batchId).getOrElse {
+ // This only happens in odd case where the file exists when
getLatestBatchId() is called,
+ // but get() doesn't find it.
+ throw new IllegalStateException(s"failed to read log file for batch
$batchId")
Review comment:
Thanks for the reference, will take a look later.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]