Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19269#discussion_r139838908
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.writer;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.Row;
    +import org.apache.spark.sql.SaveMode;
    +import org.apache.spark.sql.sources.v2.DataSourceV2Options;
    +import org.apache.spark.sql.sources.v2.WriteSupport;
    +import org.apache.spark.sql.types.StructType;
    +
    +/**
    + * A data source writer that is returned by
    + * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
    + * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
    + * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
    + *
    + * The writing procedure is:
    + *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
    + *      partitions of the input data(RDD).
    + *   2. For each partition, create a data writer with the write task, and 
write the data of the
    + *      partition with this writer. If all the data are written 
successfully, call
    + *      {@link DataWriter#commit()}. If exception happens during the 
writing, call
    + *      {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
    + *      tasks.
    + *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
    + *      all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
    + *      some partitions failed and aborted, call {@link #abort()}.
    + *
    + * Note that, data sources are responsible for providing transaction 
ability by implementing the
    + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
    + * The transaction here is Spark-level transaction, which may not be the 
underlying storage
    + * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
    + * Cassandra may need some more time to reach consistency at storage level.
    + */
    +@InterfaceStability.Evolving
    +public interface DataSourceV2Writer {
    +
    +  /**
    +   * Creates a write task which will be serialized and sent to executors. 
For each partition of the
    +   * input data(RDD), there will be one write task to write the records.
    +   */
    +  WriteTask<Row> createWriteTask();
    +
    +  /**
    +   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
    +   * all data writers for this writing job and are produced by {@link 
DataWriter#commit()}. This
    +   * also means all the data are written successfully and all data writers 
are committed.
    +   */
    +  void commit(WriterCommitMessage[] messages);
    +
    +  /**
    +   * Aborts this writing job because some data writers are failed to write 
the records and aborted.
    +   */
    +  void abort();
    --- End diff --
    
    Should this accept the commit messages for committed tasks, or will tasks 
be aborted?
    
    I'm thinking of the case where you're writing to S3. Say a data source 
writes all attempt files to the final locations, then removes any attempts that 
are aborted. If the job aborts with some tasks that have already committed, 
then either this should have the option of cleaning up those files (passed in 
the commit message) or all of the tasks should be individually aborted. I'd 
prefer to have this abort clean up successful/committed tasks because the logic 
may be different.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to