Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20710#discussion_r171993276
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
---
@@ -172,17 +173,19 @@ object DataWritingSparkTask extends Logging {
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
- val dataWriter = writeTask.createDataWriter(context.partitionId(),
context.attemptNumber())
val epochCoordinator = EpochCoordinatorRef.get(
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
SparkEnv.get)
val currentMsg: WriterCommitMessage = null
var currentEpoch =
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
do {
+ var dataWriter: DataWriter[InternalRow] = null
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
try {
+ dataWriter = writeTask.createDataWriter(
+ context.partitionId(), context.attemptNumber(), currentEpoch)
iter.foreach(dataWriter.write)
--- End diff --
fix this! dont use foreach.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]