Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/19269#discussion_r141679290
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode,
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ * 1. Create a write task by {@link #createWriteTask()}, serialize and
send it to all the
+ * partitions of the input data(RDD).
+ * 2. For each partition, create a data writer with the write task, and
write the data of the
+ * partition with this writer. If all the data are written
successfully, call
+ * {@link DataWriter#commit()}. If exception happens during the
writing, call
+ * {@link DataWriter#abort()}. This step may repeat several times as
Spark will retry failed
+ * tasks.
+ * 3. Wait until all the writers/partitions are finished, i.e., either
committed or aborted. If
+ * all partitions are written successfully, call {@link
#commit(WriterCommitMessage[])}. If
+ * some partitions failed and aborted, call {@link #abort()}.
+ *
+ * Note that, data sources are responsible for providing transaction
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
[email protected]
+public interface DataSourceV2Writer {
+
+ /**
+ * Creates a write task which will be serialized and sent to executors.
For each partition of the
+ * input data(RDD), there will be one write task to write the records.
+ */
+ WriteTask<Row> createWriteTask();
+
+ /**
+ * Commits this writing job with a list of commit messages. The commit
messages are collected from
+ * all data writers for this writing job and are produced by {@link
DataWriter#commit()}. This
+ * also means all the data are written successfully and all data writers
are committed.
+ */
+ void commit(WriterCommitMessage[] messages);
+
+ /**
+ * Aborts this writing job because some data writers are failed to write
the records and aborted.
+ */
+ void abort();
--- End diff --
Can you explain the situation you're talking about a bit more?
I think Spark should pass everything it can to the abort. I agree that the
abort here should be flexible and a best-effort, but there are situations where
it can't know how to roll back everything that tasks did without those commit
messages. There may be cases where Spark can't guarantee whether a commit was
complete or not, but where it can, it should pass those commit messages.
Say you had 100 successful write tasks of 400, but the executor died before
it returned the 100th message (but after committing data) and that executor
failure hit the max and Spark cancelled the job. Even though Spark has only 99
commit messages, it should still pass the ones that it can. In the S3 case,
these are the only way the driver knows to delete the other 99 files. So the
choice is between deleting 99 of 100 committed files, and leaving all 100
taking up space. I think rolling back 99 is much better than 0.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]