liviazhu commented on code in PR #53365:
URL: https://github.com/apache/spark/pull/53365#discussion_r2600379082
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala:
##########
@@ -208,8 +208,8 @@ class OfflineStateRepartitionRunner(
// Create a new OffsetSeq from the last committed but with an update num
shuffle partitions
val newOffsetSeq = lastCommittedOffsetSeq match {
case v1: OffsetSeq =>
- val metadata = v1.metadata.get
- v1.copy(metadata = Some(metadata.copy(
+ val metadata = v1.metadataOpt.get.asInstanceOf[OffsetSeqMetadata]
+ v1.copy(metadataOpt = Some(metadata.copy(
conf = metadata.conf + (SQLConf.SHUFFLE_PARTITIONS.key ->
numPartitions.toString))))
case _ => throw
OfflineStateRepartitionErrors.unsupportedOffsetSeqVersionError(
Review Comment:
Do we need to create a TODO + add ticket to support OffsetMap here?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala:
##########
@@ -138,11 +138,11 @@ class OffsetSeqLog(sparkSession: SparkSession, path:
String)
}
object OffsetSeqLog {
- private[streaming] val VERSION_1 = 1
- private[streaming] val VERSION_2 = 2
+ val VERSION_1 = 1
Review Comment:
Should we scope this?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala:
##########
@@ -110,11 +107,25 @@ object OffsetSeq {
* Returns a [[OffsetSeq]] with metadata and a variable sequence of offsets.
* `nulls` in the sequence are converted to `None`s.
*/
- def fill(metadata: Option[String], offsets: OffsetV2*): OffsetSeq = {
- OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply))
+ def fill(metadataOpt: Option[String], offsets: OffsetV2*): OffsetSeq = {
+ OffsetSeq(offsets.map(Option(_)), metadataOpt.map(OffsetSeqMetadata.apply))
+ }
+
+ /**
+ * Returns a [[OffsetSeqV2]] with metadata and a variable sequence of
offsets.
+ * `nulls` in the sequence are converted to `None`s.
+ */
+ def fillV2(metadataOpt: Option[String], offsets: OffsetV2*): OffsetSeqV2 = {
+ OffsetSeqV2(offsets.map(Option(_)),
metadataOpt.map(OffsetSeqMetadataV2.apply))
}
}
+/**
+ * Version 2 of OffsetSeq that uses OffsetSeqMetadataV2 for metadata.
+ */
+case class OffsetSeqV2(
+ offsets: Seq[Option[OffsetV2]],
+ metadataOpt: Option[OffsetSeqMetadataV2] = None) extends OffsetSeqBase
Review Comment:
Should we require metadata for OffsetSeqV2 (don't make it an option)?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala:
##########
@@ -124,7 +135,7 @@ object OffsetSeq {
*/
case class OffsetMap(
offsetsMap: Map[String, Option[OffsetV2]],
- metadataOpt: Option[OffsetSeqMetadata] = None) extends OffsetSeqBase {
+ metadataOpt: Option[OffsetSeqMetadataBase] = None) extends OffsetSeqBase {
Review Comment:
Same as above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]