viirya commented on code in PR #52620:
URL: https://github.com/apache/spark/pull/52620#discussion_r2459019242
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -263,6 +290,12 @@ class MicroBatchExecution(
case s => s -> ReadLimit.allAvailable()
}.toMap
}
+ if (trigger.isInstanceOf[RealTimeTrigger] && uniqueSources.size !=
sources.size) {
+ throw new SparkIllegalStateException(
+ errorClass =
s"STREAMING_REAL_TIME_MODE.IDENTICAL_SOURCES_IN_UNION_NOT_SUPPORTED",
Review Comment:
Why this is specially for Union? Looks like it doesn't check if there is a
Union.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryPlanTraverseHelper.scala:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.runtime
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+
+/**
+ * This is a utility object placing methods to traverse the query plan for
streaming query.
+ * This is used for patterns of traversal which are repeated in multiple
places.
+ */
+object StreamingQueryPlanTraverseHelper {
+ def collectFromUnfoldedLatestExecutedPlan[B](
+ lastExecution: IncrementalExecution)(pf: PartialFunction[SparkPlan, B]):
Seq[B] = {
+ val executedPlan = lastExecution.executedPlan
+
+ collectWithUnfolding(executedPlan)(pf)
+ }
+
+ private def collectWithUnfolding[B](executedPlan: SparkPlan)(
+ pf: PartialFunction[SparkPlan, B]): Seq[B] = {
+ executedPlan.flatMap {
+ // InMemoryTableScanExec is a node to represent a cached plan. The node
has underlying
+ // actual executed plan, which we should traverse to collect the
required information.
+ case s: InMemoryTableScanExec =>
collectWithUnfolding(s.relation.cachedPlan)(pf)
Review Comment:
Hm? Do you mean `RealTimeStreamScanExec` in a cache?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryPlanTraverseHelper.scala:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.runtime
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+
+/**
+ * This is a utility object placing methods to traverse the query plan for
streaming query.
+ * This is used for patterns of traversal which are repeated in multiple
places.
+ */
+object StreamingQueryPlanTraverseHelper {
Review Comment:
Should we make it private to sql?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala:
##########
@@ -37,3 +49,121 @@ object LowLatencyClock {
clock = inputClock
}
}
+
+/**
+ * A wrap reader that turns a Partition Reader extending SupportsRealTimeRead
to a
+ * normal PartitionReader and follow the task termination time
`lowLatencyEndTime`, and
+ * report end offsets in the end to `endOffsets`.
+ */
+case class LowLatencyReaderWrap(
+ reader: SupportsRealTimeRead[InternalRow],
+ lowLatencyEndTime: Long,
+ endOffsets: CollectionAccumulator[PartitionOffsetWithIndex])
+ extends PartitionReader[InternalRow] {
+
+ override def next(): Boolean = {
+ val curTime = LowLatencyClock.getTimeMillis()
+ val ret = if (curTime >= lowLatencyEndTime) {
+ RecordStatus.newStatusWithoutArrivalTime(false)
+ } else {
+ reader.nextWithTimeout(lowLatencyEndTime - curTime)
+ }
+
+ if (!ret.hasRecord) {
+ // The way of using TaskContext.get().partitionId() to map to a partition
+ // may be fragile as it relies on thread locals.
+ endOffsets.add(
+ new PartitionOffsetWithIndex(TaskContext.get().partitionId(),
reader.getOffset)
+ )
+ }
+ ret.hasRecord
+ }
+
+ override def get(): InternalRow = {
+ reader.get()
+ }
+
+ override def close(): Unit = {}
+}
+
+/**
+ * Wrapper factory that creates LowLatencyReaderWrap from reader as
SupportsRealTimeRead
+ */
+case class LowLatencyReaderFactoryWrap(
+ partitionReaderFactory: PartitionReaderFactory,
+ lowLatencyEndTime: Long,
+ endOffsets: CollectionAccumulator[PartitionOffsetWithIndex])
+ extends PartitionReaderFactory
+ with Logging {
+ override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
+ val rowReader = partitionReaderFactory.createReader(partition)
+ assert(rowReader.isInstanceOf[SupportsRealTimeRead[InternalRow]])
+ logInfo(
+ log"Creating low latency PartitionReader, stopping at " +
+ log"${MDC(LogKeys.TO_TIME, lowLatencyEndTime)}"
+ )
+ LowLatencyReaderWrap(
+ rowReader.asInstanceOf[SupportsRealTimeRead[InternalRow]],
+ lowLatencyEndTime,
+ endOffsets
+ )
+ }
+}
+
+/**
+ * Physical plan node for scanning a micro-batch of data from a data source.
+ */
Review Comment:
Could we add more doc to clarify this is especially for RTM?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -1078,6 +1182,73 @@ class MicroBatchExecution(
sparkSessionForStream.sessionState.conf)) {
updateStateStoreCkptId(execCtx, latestExecPlan)
}
+
+ var needSignalProgressLock = false
+ // In real-time mode, we delay the offset logging until the end of the
batch.
+ // We first gather the offsets processed up to from all
RealTimeStreamScanExec,
+ // i.e. tasks that execute a source partition. We merge the offsets and
+ // write them to the offset log
+ if (trigger.isInstanceOf[RealTimeTrigger]) {
+ val actualLastExecution = execCtx.executionPlan
+
+ val execs = StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedLatestExecutedPlan(actualLastExecution) {
+ case e: RealTimeStreamScanExec => e
+ }
+
+ val endOffsetMap = MutableMap[SparkDataStream, OffsetV2]()
+ execs.foreach { e =>
+ val lowLatencyExec = e.asInstanceOf[RealTimeStreamScanExec]
+ val accus: Seq[PartitionOffsetWithIndex] =
+ lowLatencyExec.endOffsetsAccumulator.value.asScala.toSeq
+ val sortedPartitionOffsets =
accus.sortBy(_.index).map(_.partitionOffset).toArray
+ val source = e.stream
+ val endOffset = source
+ .asInstanceOf[SupportsRealTimeMode]
+ .mergeOffsets(sortedPartitionOffsets)
+ endOffsetMap += (source -> endOffset)
+ }
+
+ assert(endOffsetMap.size == execs.size, "Identical sources exist in the
physical nodes" +
+ " which is not supported.")
+
+ execCtx.endOffsets ++= endOffsetMap
+ execCtx.recordEndOffsets(execCtx.endOffsets)
+ execCtx.recordTriggerOffsets(
+ from = execCtx.startOffsets,
+ to = execCtx.endOffsets,
+ latest = execCtx.latestOffsets
+ )
+ execCtx.reportTimeTaken("walCommit") {
+ if (!offsetLog.add(
+ execCtx.batchId,
+ execCtx.endOffsets.toOffsetSeq(sources, execCtx.offsetSeqMetadata)
+ )) {
+ throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
+ }
+ }
+ logInfo(
+ log"Committed offsets for batch ${MDC(LogKeys.BATCH_ID,
execCtx.batchId)}. Metadata " +
+ log"${MDC(LogKeys.OFFSET_SEQUENCE_METADATA,
execCtx.offsetSeqMetadata)}"
+ )
+ var shouldUpdate = true
+ sources.foreach { s =>
+ execCtx.startOffsets.get(s).foreach { prevOffsets =>
+ if (!prevOffsets.equals(endOffsetMap(s))) {
+ shouldUpdate = false
+ }
+ }
+ }
+ if (shouldUpdate) {
+ // To trigger processAllAvailable() return.
+ noNewData = true
+ // We could signal here, but when the test thread sees
ProcessAllAvailable(), but
+ // signaling after commit log will make it less likely that the caller
of
+ // ProcessAllAvailable() sees offset log written but not commit log.
Review Comment:
I think I got what this tries to say, but seems a bit hard to read. Could we
rephrase it?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala:
##########
@@ -37,3 +49,121 @@ object LowLatencyClock {
clock = inputClock
}
}
+
+/**
+ * A wrap reader that turns a Partition Reader extending SupportsRealTimeRead
to a
+ * normal PartitionReader and follow the task termination time
`lowLatencyEndTime`, and
+ * report end offsets in the end to `endOffsets`.
+ */
+case class LowLatencyReaderWrap(
+ reader: SupportsRealTimeRead[InternalRow],
+ lowLatencyEndTime: Long,
+ endOffsets: CollectionAccumulator[PartitionOffsetWithIndex])
+ extends PartitionReader[InternalRow] {
+
+ override def next(): Boolean = {
+ val curTime = LowLatencyClock.getTimeMillis()
+ val ret = if (curTime >= lowLatencyEndTime) {
+ RecordStatus.newStatusWithoutArrivalTime(false)
+ } else {
+ reader.nextWithTimeout(lowLatencyEndTime - curTime)
+ }
+
+ if (!ret.hasRecord) {
+ // The way of using TaskContext.get().partitionId() to map to a partition
+ // may be fragile as it relies on thread locals.
+ endOffsets.add(
+ new PartitionOffsetWithIndex(TaskContext.get().partitionId(),
reader.getOffset)
+ )
+ }
+ ret.hasRecord
+ }
+
+ override def get(): InternalRow = {
+ reader.get()
+ }
+
+ override def close(): Unit = {}
+}
+
+/**
+ * Wrapper factory that creates LowLatencyReaderWrap from reader as
SupportsRealTimeRead
+ */
+case class LowLatencyReaderFactoryWrap(
+ partitionReaderFactory: PartitionReaderFactory,
+ lowLatencyEndTime: Long,
+ endOffsets: CollectionAccumulator[PartitionOffsetWithIndex])
+ extends PartitionReaderFactory
+ with Logging {
+ override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
+ val rowReader = partitionReaderFactory.createReader(partition)
+ assert(rowReader.isInstanceOf[SupportsRealTimeRead[InternalRow]])
+ logInfo(
+ log"Creating low latency PartitionReader, stopping at " +
+ log"${MDC(LogKeys.TO_TIME, lowLatencyEndTime)}"
+ )
+ LowLatencyReaderWrap(
+ rowReader.asInstanceOf[SupportsRealTimeRead[InternalRow]],
+ lowLatencyEndTime,
+ endOffsets
+ )
+ }
+}
+
+/**
+ * Physical plan node for scanning a micro-batch of data from a data source.
+ */
+case class RealTimeStreamScanExec(
+ output: Seq[Attribute],
+ @transient scan: Scan,
+ @transient stream: MicroBatchStream,
+ @transient start: Offset,
+ batchDurationMs: Long)
+ extends DataSourceV2ScanExecBase {
+
+ override def keyGroupedPartitioning: Option[Seq[Expression]] = None
+
+ override def ordering: Option[Seq[SortOrder]] = None
+
+ val endOffsetsAccumulator: CollectionAccumulator[PartitionOffsetWithIndex] =
{
+ assert(stream.isInstanceOf[SupportsRealTimeMode])
Review Comment:
Seems this assert can moved out from `endOffsetsAccumulator` initialization
into `RealTimeStreamScanExec`?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala:
##########
@@ -244,6 +244,14 @@ abstract class ProgressContext(
currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
}
+ /**
+ * Only used by Real-time Mode. For other cases, end offsets are determined
+ * in the batch planning phase so it is never need to be updated.
+ */
+ def recordEndOffsets(to: StreamProgress): Unit = {
+ currentTriggerEndOffsets = to.transform((_, v) => v.json)
+ }
Review Comment:
Can we move this to `MicroBatchExecutionContext`? I think it is particular
instead of a general method for all `ProgressContext`?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -333,6 +375,29 @@ class MicroBatchExecution(
private def initializeExecution(
sparkSessionForStream: SparkSession): MicroBatchExecutionContext = {
+ var latestStartedBatch = offsetLog.getLatest()
+ val latestCommittedBatch = commitLog.getLatest()
+
+ val lastCommittedBatchId = latestCommittedBatch match {
+ case Some((batchId, _)) => batchId
+ case _ => -1L
+ }
+
+ // For a query running in Real-time Mode that fails after
+ // writing to offset log but before writing to commit log, we delete the
extra
+ // entries in offsetLog to sync up. Note that this also means async
checkpoint rollback handling
+ // is not compatible with low latency mode at this stage.
Review Comment:
low latency mode -> Real-time Mode. It should be better to keep consistent
term in the doc.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala:
##########
@@ -37,3 +49,121 @@ object LowLatencyClock {
clock = inputClock
}
}
+
+/**
+ * A wrap reader that turns a Partition Reader extending SupportsRealTimeRead
to a
+ * normal PartitionReader and follow the task termination time
`lowLatencyEndTime`, and
+ * report end offsets in the end to `endOffsets`.
+ */
+case class LowLatencyReaderWrap(
+ reader: SupportsRealTimeRead[InternalRow],
+ lowLatencyEndTime: Long,
+ endOffsets: CollectionAccumulator[PartitionOffsetWithIndex])
+ extends PartitionReader[InternalRow] {
+
+ override def next(): Boolean = {
+ val curTime = LowLatencyClock.getTimeMillis()
+ val ret = if (curTime >= lowLatencyEndTime) {
+ RecordStatus.newStatusWithoutArrivalTime(false)
+ } else {
+ reader.nextWithTimeout(lowLatencyEndTime - curTime)
+ }
+
+ if (!ret.hasRecord) {
+ // The way of using TaskContext.get().partitionId() to map to a partition
+ // may be fragile as it relies on thread locals.
Review Comment:
What does it mean? Is it a problem for RTM code?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PartitionOffsetWithIndex.scala:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.internal.connector;
+
+import java.io.Serializable;
+
+import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
+
+/**
+ * Internal class for real time mode to pass partition offset from executors
to the driver.
+ */
+private[sql] case class PartitionOffsetWithIndex(index: Long, partitionOffset:
PartitionOffset)
+ extends Serializable;
Review Comment:
I think Scala case class automatically extends `Serializable`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]