Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/19269#discussion_r142004644
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode,
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ * 1. Create a write task by {@link #createWriteTask()}, serialize and
send it to all the
+ * partitions of the input data(RDD).
+ * 2. For each partition, create a data writer with the write task, and
write the data of the
+ * partition with this writer. If all the data are written
successfully, call
+ * {@link DataWriter#commit()}. If exception happens during the
writing, call
+ * {@link DataWriter#abort()}. This step may repeat several times as
Spark will retry failed
+ * tasks.
+ * 3. Wait until all the writers/partitions are finished, i.e., either
committed or aborted. If
+ * all partitions are written successfully, call {@link
#commit(WriterCommitMessage[])}. If
+ * some partitions failed and aborted, call {@link #abort()}.
+ *
+ * Note that, data sources are responsible for providing transaction
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
[email protected]
+public interface DataSourceV2Writer {
+
+ /**
+ * Creates a write task which will be serialized and sent to executors.
For each partition of the
+ * input data(RDD), there will be one write task to write the records.
+ */
+ WriteTask<Row> createWriteTask();
+
+ /**
+ * Commits this writing job with a list of commit messages. The commit
messages are collected from
+ * all data writers for this writing job and are produced by {@link
DataWriter#commit()}. This
+ * also means all the data are written successfully and all data writers
are committed.
+ */
+ void commit(WriterCommitMessage[] messages);
+
+ /**
+ * Aborts this writing job because some data writers are failed to write
the records and aborted.
+ */
+ void abort();
--- End diff --
Knowing what's being committed can provide a bit more information as to
what is going on, and could be appreciated for that. The biggest issue I fear
is loss of the driver itself, so *no* abort() call is made. A strategy for
cleaning up from that would be good, even though its primarily one of bringing
up a new writer and saying "cleanup this mess a previous instance left".
Looking the S3 example, my strategy there is/would be: identify all pending
commits, abort them, then clean up the dest dir.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]