Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15696#discussion_r85860514
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
 ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import java.util.{Date, UUID}
    +
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.mapreduce._
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    +
    +import org.apache.spark.SparkHadoopWriter
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.mapred.SparkHadoopMapRedUtil
    +import org.apache.spark.sql.internal.SQLConf
    +
    +
    +object FileCommitProtocol {
    +  class TaskCommitMessage(obj: Any) extends Serializable
    +
    +  object EmptyTaskCommitMessage extends TaskCommitMessage(Unit)
    +}
    +
    +
    +/**
    + * An interface to define how a Spark job commits its outputs. 
Implementations must be serializable.
    + *
    + * The proper call sequence is:
    + *
    + * 1. Driver calls setupJob.
    + * 2. As part of each task's execution, executor calls setupTask and then 
commitTask
    + *    (or abortTask if task failed).
    + * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
    + *    failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
    + */
    +abstract class FileCommitProtocol {
    +  import FileCommitProtocol._
    +
    +  /**
    +   * Setups up a job. Must be called on the driver before any other 
methods can be invoked.
    +   */
    +  def setupJob(jobContext: JobContext): Unit
    +
    +  /**
    +   * Commits a job after the writes succeed. Must be called on the driver.
    +   */
    +  def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit
    +
    +  /**
    +   * Aborts a job after the writes fail. Must be called on the driver.
    +   *
    +   * Calling this function is a best-effort attempt, because it is 
possible that the driver
    +   * just crashes (or killed) before it can call abort.
    +   */
    +  def abortJob(jobContext: JobContext): Unit
    +
    +  /**
    +   * Sets up a task within a job.
    +   * Must be called before any other task related methods can be invoked.
    +   */
    +  def setupTask(taskContext: TaskAttemptContext): Unit
    +
    +  /**
    +   * Notifies the commit protocol to add a new file, and gets back the 
full path that should be
    +   * used. Must be called on the executors when running tasks.
    +   *
    +   * A full file path consists of the following parts:
    +   *  1. the base path
    +   *  2. some sub-directory within the base path, used to specify 
partitioning
    +   *  3. file prefix, usually some unique job id with the task id
    +   *  4. bucket id
    +   *  5. source specific file extension, e.g. ".snappy.parquet"
    +   *
    +   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 
and 5, and the rest
    +   * are left to the commit protocol implementation to decide.
    +   */
    +  def addTaskTempFile(taskContext: TaskAttemptContext, dir: 
Option[String], ext: String): String
    +
    +  /**
    +   * Commits a task after the writes succeed. Must be called on the 
executors when running tasks.
    +   */
    +  def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
    +
    +  /**
    +   * Aborts a task after the writes have failed. Must be called on the 
executors when running tasks.
    +   */
    +  def abortTask(taskContext: TaskAttemptContext): Unit
    +}
    +
    +
    +/**
    + * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop 
OutputCommitter
    + * (from the newer mapreduce API, not the old mapred API).
    + *
    + * Unlike Hadoop's OutputCommitter, this implementation is serializable.
    + */
    +class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean)
    --- End diff --
    
    Should we call this HadoopCommitProtocolWrapper or something to be more 
clear?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to