Github user rezasafi commented on a diff in the pull request:
https://github.com/apache/spark/pull/15769#discussion_r140862939
--- Diff:
core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import scala.reflect.ClassTag
+import scala.util.DynamicVariable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{JobConf, JobID}
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.{SparkConf, SparkException, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.OutputMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+/**
+ * A helper object that saves an RDD using a Hadoop OutputFormat
+ * (from the newer mapreduce API, not the old mapred API).
+ */
+private[spark]
+object SparkHadoopMapReduceWriter extends Logging {
+
+ /**
+ * Basic work flow of this command is:
+ * 1. Driver side setup, prepare the data source and hadoop
configuration for the write job to
+ * be issued.
+ * 2. Issues a write job consists of one or more executor side tasks,
each of which writes all
+ * rows within an RDD partition.
+ * 3. If no exception is thrown in a task, commits that task, otherwise
aborts that task; If any
+ * exception is thrown during task commitment, also aborts that task.
+ * 4. If all tasks are committed, commit the job, otherwise aborts the
job; If any exception is
+ * thrown during job commitment, also aborts the job.
+ */
+ def write[K, V: ClassTag](
+ rdd: RDD[(K, V)],
+ hadoopConf: Configuration): Unit = {
+ // Extract context and configuration from RDD.
+ val sparkContext = rdd.context
+ val stageId = rdd.id
--- End diff --
Is this accurate? Seems weird that satgeId is set to be equal to rdd.id.
What is the commit protocol here?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]