Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/20386#discussion_r165117779
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
---
@@ -32,40 +32,44 @@
@InterfaceStability.Evolving
public interface StreamWriter extends DataSourceWriter {
/**
- * Commits this writing job for the specified epoch with a list of
commit messages. The commit
- * messages are collected from successful data writers and are produced
by
- * {@link DataWriter#commit()}.
+ * Commits this writing job for the specified epoch.
*
- * If this method fails (by throwing an exception), this writing job is
considered to have been
- * failed, and the execution engine will attempt to call {@link
#abort(WriterCommitMessage[])}.
+ * When this method is called, the number of commit messages added by
+ * {@link #add(WriterCommitMessage)} equals to the number of input data
partitions.
+ *
+ * If this method fails (by throwing an exception), this writing job is
considered to to have been
+ * failed, and {@link #abort()} would be called. The state of the
destination
+ * is undefined and @{@link #abort()} may not be able to deal with it.
*
* To support exactly-once processing, writer implementations should
ensure that this method is
* idempotent. The execution engine may call commit() multiple times for
the same epoch
* in some circumstances.
*/
- void commit(long epochId, WriterCommitMessage[] messages);
+ void commit(long epochId);
/**
- * Aborts this writing job because some data writers are failed and keep
failing when retry, or
- * the Spark job fails with some unknown reasons, or {@link
#commit(WriterCommitMessage[])} fails.
+ * Aborts this writing job because some data writers are failed and keep
failing when retry,
+ * or the Spark job fails with some unknown reasons,
+ * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails
*
* If this method fails (by throwing an exception), the underlying data
source may require manual
* cleanup.
*
- * Unless the abort is triggered by the failure of commit, the given
messages should have some
- * null slots as there maybe only a few data writers that are committed
before the abort
- * happens, or some data writers were committed but their commit
messages haven't reached the
- * driver when the abort is triggered. So this is just a "best effort"
for data sources to
- * clean up the data left by data writers.
+ * Unless the abort is triggered by the failure of commit, the number of
commit
+ * messages added by {@link #add(WriterCommitMessage)} should be smaller
than the number
+ * of input data partitions, as there may be only a few data writers
that are committed
+ * before the abort happens, or some data writers were committed but
their commit messages
+ * haven't reached the driver when the abort is triggered. So this is
just a "best effort"
--- End diff --
Best effort is not just how we describe the behavior, it is a requirement
of the contract. Spark should not drop commit messages because it is
convenient. Spark knows what tasks succeeded and failed and which ones were
authorized to commit. That's enough information to provide the best-effort
guarantee.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]