Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20710#discussion_r172303088
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
---
@@ -172,18 +173,22 @@ object DataWritingSparkTask extends Logging {
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
- val dataWriter = writeTask.createDataWriter(context.partitionId(),
context.attemptNumber())
val epochCoordinator = EpochCoordinatorRef.get(
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
SparkEnv.get)
val currentMsg: WriterCommitMessage = null
var currentEpoch =
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
do {
+ var dataWriter: DataWriter[InternalRow] = null
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
try {
- iter.foreach(dataWriter.write)
+ dataWriter = writeTask.createDataWriter(
+ context.partitionId(), context.attemptNumber(), currentEpoch)
+ while (iter.hasNext) {
--- End diff --
is there a reason to change `foreach` to a while loop?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]