Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158361942
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
---
@@ -109,6 +125,42 @@ object DataWritingSparkTask extends Logging {
logError(s"Writer for partition ${context.partitionId()} aborted.")
})
}
+
+ def runContinuous(
+ writeTask: DataWriterFactory[InternalRow],
+ context: TaskContext,
+ iter: Iterator[InternalRow]): WriterCommitMessage = {
+ val dataWriter = writeTask.createDataWriter(context.partitionId(),
context.attemptNumber())
+ val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+ val currentMsg: WriterCommitMessage = null
+ var currentEpoch =
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ do {
+ // write the data and commit this writer.
+ Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+ try {
+ iter.foreach(dataWriter.write)
+ logInfo(s"Writer for partition ${context.partitionId()} is
committing.")
+ val msg = dataWriter.commit()
+ logInfo(s"Writer for partition ${context.partitionId()}
committed.")
+ EpochCoordinatorRef.get(runId, SparkEnv.get).send(
--- End diff --
nit: `EpochCoordinatorRef.get` is not cheap. You can store it outside the
loop.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]