Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20386#discussion_r164918470
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
---
@@ -32,40 +32,44 @@
@InterfaceStability.Evolving
public interface StreamWriter extends DataSourceWriter {
/**
- * Commits this writing job for the specified epoch with a list of
commit messages. The commit
- * messages are collected from successful data writers and are produced
by
- * {@link DataWriter#commit()}.
+ * Commits this writing job for the specified epoch.
*
- * If this method fails (by throwing an exception), this writing job is
considered to have been
- * failed, and the execution engine will attempt to call {@link
#abort(WriterCommitMessage[])}.
+ * When this method is called, the number of commit messages added by
+ * {@link #add(WriterCommitMessage)} equals to the number of input data
partitions.
+ *
+ * If this method fails (by throwing an exception), this writing job is
considered to to have been
+ * failed, and {@link #abort()} would be called. The state of the
destination
+ * is undefined and @{@link #abort()} may not be able to deal with it.
*
* To support exactly-once processing, writer implementations should
ensure that this method is
* idempotent. The execution engine may call commit() multiple times for
the same epoch
--- End diff --
The StreamWriter is responsible for setting up a distributed transaction to
commit the data within batch both locally and to the remote system. But the
StreamExecution keeps its own log of which batches have been fully completed.
("Fully completed" includes things like stateful aggregation commit and
progress logging which can't reasonably participate in the StreamWriter's
transaction.)
So there's a scenario where Spark fails between StreamWriter commit and
StreamExecution commit, in which the StreamExecution must re-execute the batch
to ensure everything is in the right state. The StreamWriter is responsible for
ensuring this doesn't generate duplicate data in the remote system.
Note that the "true" exactly once strategy, where the StreamWriter aborts
the retried batch because it was already committed before, is indeed idempotent
wrt StreamWriter.commit(epochId). But there are weaker strategies which still
provide equivalent semantics.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]