Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/19269#discussion_r141679351
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode,
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the
data saving. The actual
+ * writing logic is delegated to {@link DataWriter}.
+ *
+ * The writing procedure is:
+ * 1. Create a writer factory by {@link #createWriterFactory()},
serialize and send it to all the
+ * partitions of the input data(RDD).
+ * 2. For each partition, create the data writer, and write the data of
the partition with this
+ * writer. If all the data are written successfully, call {@link
DataWriter#commit()}. If
+ * exception happens during the writing, call {@link
DataWriter#abort()}. This step may repeat
+ * several times as Spark will retry failed tasks.
+ * 3. If all writers are successfully committed, call {@link
#commit(WriterCommitMessage[])}. If
+ * some writers are aborted, or the job failed with an unknown
reason, call {@link #abort()}.
+ *
+ * Spark may launch speculative tasks in step 2, so there may be more than
one data writer working
+ * simultaneously for the same partition. Implementations should handle
this case correctly, e.g.,
+ * make sure only one data writer can commit successfully, or only admit
one committed data writer
+ * and ignore/revert others at job level.
+ *
+ * Note that, data sources are responsible for providing transaction
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
[email protected]
+public interface DataSourceV2Writer {
+
+ /**
+ * Creates a writer factory which will be serialized and sent to
executors.
+ */
+ DataWriteFactory<Row> createWriterFactory();
+
+ /**
+ * Commits this writing job with a list of commit messages. The commit
messages are collected from
+ * all data writers and are produced by {@link DataWriter#commit()}.
+ *
+ * Note that, one partition may have multiple committed data writers
because of speculative tasks.
+ * Spark will pick the first successful one and get its commit message.
Implementations should be
+ * aware of this and handle it correctly, e.g., have a mechanism to make
sure only one data writer
+ * can commit successfully, or have a way to clean up the data of
already committed writers.
+ */
+ void commit(WriterCommitMessage[] messages);
+
+ /**
+ * Aborts this writing job because some data writers are failed to write
the records and aborted,
+ * or the Spark job failed with some unknown reasons.
+ *
+ * Note that some data writer may already be committed in this case,
implementations should be
+ * aware of this and clean up the data.
+ */
+ void abort();
--- End diff --
Sorry I missed that!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]