vanzin commented on a change in pull request #25007: [SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API URL: https://github.com/apache/spark/pull/25007#discussion_r307439093
########## File path: core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionWriter.java ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.api; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; + +import org.apache.spark.annotation.Private; +import org.apache.spark.shuffle.sort.io.DefaultWritableByteChannelWrapper; + +/** + * :: Private :: + * An interface for opening streams to persist partition bytes to a backing data store. + * <p> + * This writer stores bytes for one (mapper, reducer) pair, corresponding to one shuffle + * block. + * + * @since 3.0.0 + */ +@Private +public interface ShufflePartitionWriter { + + /** + * Open and return an {@link OutputStream} that can write bytes to the underlying + * data store. + * <p> + * This method will only be called once on this partition writer in the map task, to write the + * bytes to the partition. The output stream will only be used to write the bytes for this + * partition. The map task closes this output stream upon writing all the bytes for this + * block, or if the write fails for any reason. + * <p> + * Implementations that intend on combining the bytes for all the partitions written by this + * map task should reuse the same OutputStream instance across all the partition writers provided + * by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that + * {@link OutputStream#close()} does not close the resource, since it will be reused across + * partition writes. The underlying resources should be cleaned up in + * {@link ShuffleMapOutputWriter#commitAllPartitions()} and + * {@link ShuffleMapOutputWriter#abort(Throwable)}. + */ + OutputStream openStream() throws IOException; + + /** + * Opens and returns a {@link WritableByteChannelWrapper} for transferring bytes from + * input byte channels to the underlying shuffle data store. + * <p> + * This method will only be called once on this partition writer in the map task, to write the + * bytes to the partition. The channel will only be used to write the bytes for this + * partition. The map task closes this channel upon writing all the bytes for this + * block, or if the write fails for any reason. + * <p> + * Implementations that intend on combining the bytes for all the partitions written by this + * map task should reuse the same channel instance across all the partition writers provided + * by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that + * {@link WritableByteChannelWrapper#close()} does not close the resource, since it + * will be reused across partition writes. The underlying resources should be cleaned up in + * {@link ShuffleMapOutputWriter#commitAllPartitions()} and + * {@link ShuffleMapOutputWriter#abort(Throwable)}. + * <p> + * This method is primarily for advanced optimizations where bytes can be copied from the input + * spill files to the output channel without copying data into memory. + * <p> + * The default implementation should be sufficient for most situations. Only override this Review comment: Actually I'm not sure this is true, if the goal is to actually provide an optimization. In that case, the default implementation is only sufficient if your stream is a `FileInputStream` (just checked what `Channels.newChannel()` does). Otherwise, the wrapper created will copy data into user memory, basically negating the optimization. (Which maybe is an argument for returning `null` here and falling back to the normal IO path when that happens.) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
