vanzin commented on a change in pull request #24970: [SPARK-23977][SQL] Support High Performance S3A committers URL: https://github.com/apache/spark/pull/24970#discussion_r297878237
########## File path: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/package.scala ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.SQLConf + +/** + * Package object to assist in switching to the Hadoop Hadoop 3 + * [[org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory]] factory + * mechanism for dynamically loading committers for the destination stores. + * + * = Using Alternative Committers with Spark and Hadoop 3 = + * + * Hadoop 3.1 adds a means to select a different output committer when writing + * data to object stores. This can provide higher performance as well as + * addressing the consistency and atomicity problems encountered on some filesystems. + * + * Every object store can implement its own committer factory: the factory + * itself will then instantiated the committer of its choice. + * + * == Prerequisites == + * + * The new APIs are in Hadoop-3.0.2, but the S3A connectors only + * found in Hadoop 3.1+ + * + * The Hadoop cluster needs to be configured for the binding from filesystem scheme + * to factory. In Hadoop 3.1+ this is done automatically for s3a in the file + * `mapred-default.xml`. + * Other stores' committers may need to be explicitly declared. + * + * {{{ + * <property> + * <name>mapreduce.outputcommitter.factory.scheme.s3a</name> + * <value>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</value> + * <description> + * The committer factory to use when writing data to S3A filesystems. + * If mapreduce.outputcommitter.factory.class is set, it will + * override this property. + * </description> + * </property> + * }}} + * + * == Binding a Spark Context to use the new committers for a store == + * + * Spark uses the Hadoop committers in + * [[org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand]] + * by instantiating and then invoking an instance of + * [[org.apache.spark.internal.io.HadoopMapReduceCommitProtocol]]. + * `InsertIntoHadoopFsRelationCommand` needs to be configured to use + * [[org.apache.spark.internal.io.cloud.PathOutputCommitProtocol]] as + * the commit protocol to use. This instantiates the committer through + * the factory mechanism, and relays operations to it. + * + * When working with Parquet data, you need to explicitly switch + * the Parquet committers to use the same mechanism + * + * In `spark-defaults.conf`, everything can be set up with the following settings: + * {{{ + * spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter + * spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol + * }}} + * + * It can be done programmatically by calling [[cloud.bind()]] on the Review comment: While having a programmatic helper is nice, this is in an `internal` package, so no one will find it since those don't show up in documentation... What I'd suggest, perhaps as a separate task, would be to automatically switch to these committers if they're available and if there's no drawback in doing so, so that by default having a build with Spark + Hadoop 3.2 + the cloud bindings automatically gives you this stuff, without needing to configure things at all. (Perhaps with a flag to disable the automatic use of the committers if someone does run into issues.) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
