ericm-db commented on code in PR #53365:
URL: https://github.com/apache/spark/pull/53365#discussion_r2595701804


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala:
##########
@@ -296,3 +322,84 @@ object OffsetSeqMetadata extends Logging {
     }
   }
 }
+
+/**
+ * This class is used to store the metadata for a source in the offset log 
within the
+ * streaming checkpoint.
+ *
+ * @param sourceId: The ID of the source.
+ * @param providerName: The name of the provider.
+ * @param apiVersion: The API version for the source - whether it is DSv1 or 
DSv2.
+ * @param rewindProviderMetadata: The metadata of the rewind provider.
+ */
+case class SourceMetadataInfo(
+    sourceId: String,
+    providerName: String,
+    apiVersion: String,
+    rewindProviderMetadata: 
org.apache.spark.sql.execution.streaming.RewindProviderMetadata) {
+  def json: String = Serialization.write(this)(OffsetSeqMetadata.format)
+}
+
+/**
+ * This class is used to store the control batch information for a batch in 
the offset log.
+ * Control batches are special batches that do not contain any data batches 
and is not used
+ * for processing data, but only stores metadata or used for control 
operations (e.g. rewind batch).
+ * Value of None means current batch is a data batch.
+ *
+ * @param rewindInfo: The rewind batch information if the current batch is a 
rewind batch.
+ */
+case class OffsetSeqControlBatchInfo(rewindInfo: 
Option[OffsetSeqRewindBatchInfo] = None)
+
+/**
+ * This class is used to store the rewind batch information for a batch in the 
offset log.
+ *
+ * @param oldBatchId: The batch id of the old batch that was rewinded to.
+ */
+case class OffsetSeqRewindBatchInfo(oldBatchId: Long)
+
+/**
+ * Contains metadata associated with a [[OffsetMap]]. This information is
+ * persisted to the offset log in the checkpoint location via the 
[[OffsetMap]] metadata field.
+ *
+ * @param batchWatermarkMs: The current eventTime watermark, used to
+ * bound the lateness of data that will processed. Time unit: milliseconds
+ * @param batchTimestampMs: The current batch processing timestamp.
+ * Time unit: milliseconds
+ * @param conf: Additional conf_s to be persisted across batches,
+ * e.g. number of shuffle partitions.
+ * @param sourceMetadataInfo: The source related metadata for the streaming 
query,
+ * mapped by sourceId.
+ * @param controlBatchInfo: The control batch information if the current batch 
is a
+ * control batch (e.g. rewind batch). Value of None means current batch is a 
data batch.
+ */
+case class OffsetSeqMetadataV2(
+    batchWatermarkMs: Long = 0,
+    batchTimestampMs: Long = 0,
+    conf: Map[String, String] = Map.empty,
+    sourceMetadataInfo: Map[String, SourceMetadataInfo] = Map.empty,
+    controlBatchInfo: Option[OffsetSeqControlBatchInfo] = None) extends 
OffsetSeqMetadataBase {

Review Comment:
   Yeah we want to be able to reference these separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to