mccheah commented on a change in pull request #25007: 
[SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API 
URL: https://github.com/apache/spark/pull/25007#discussion_r310322735
 
 

 ##########
 File path: 
core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionWriter.java
 ##########
 @@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.api;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.shuffle.sort.io.DefaultWritableByteChannelWrapper;
+
+/**
+ * :: Private ::
+ * An interface for opening streams to persist partition bytes to a backing 
data store.
+ * <p>
+ * This writer stores bytes for one (mapper, reducer) pair, corresponding to 
one shuffle
+ * block.
+ *
+ * @since 3.0.0
+ */
+@Private
+public interface ShufflePartitionWriter {
+
+  /**
+   * Open and return an {@link OutputStream} that can write bytes to the 
underlying
+   * data store.
+   * <p>
+   * This method will only be called once on this partition writer in the map 
task, to write the
+   * bytes to the partition. The output stream will only be used to write the 
bytes for this
+   * partition. The map task closes this output stream upon writing all the 
bytes for this
+   * block, or if the write fails for any reason.
+   * <p>
+   * Implementations that intend on combining the bytes for all the partitions 
written by this
+   * map task should reuse the same OutputStream instance across all the 
partition writers provided
+   * by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
+   * {@link OutputStream#close()} does not close the resource, since it will 
be reused across
+   * partition writes. The underlying resources should be cleaned up in
+   * {@link ShuffleMapOutputWriter#commitAllPartitions()} and
+   * {@link ShuffleMapOutputWriter#abort(Throwable)}.
+   */
+  OutputStream openStream() throws IOException;
+
+  /**
+   * Opens and returns a {@link WritableByteChannelWrapper} for transferring 
bytes from
+   * input byte channels to the underlying shuffle data store.
+   * <p>
+   * This method will only be called once on this partition writer in the map 
task, to write the
+   * bytes to the partition. The channel will only be used to write the bytes 
for this
+   * partition. The map task closes this channel upon writing all the bytes 
for this
+   * block, or if the write fails for any reason.
+   * <p>
+   * Implementations that intend on combining the bytes for all the partitions 
written by this
+   * map task should reuse the same channel instance across all the partition 
writers provided
+   * by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
+   * {@link WritableByteChannelWrapper#close()} does not close the resource, 
since it
+   * will be reused across partition writes. The underlying resources should 
be cleaned up in
+   * {@link ShuffleMapOutputWriter#commitAllPartitions()} and
+   * {@link ShuffleMapOutputWriter#abort(Throwable)}.
+   * <p>
+   * This method is primarily for advanced optimizations where bytes can be 
copied from the input
+   * spill files to the output channel without copying data into memory.
+   * <p>
+   * The default implementation should be sufficient for most situations. Only 
override this
+   * method if there is a very specific optimization that needs to be built.
+   * <p>
+   * Note that the returned {@link WritableByteChannelWrapper} itself is 
closed, but not the
+   * underlying channel that is returned by {@link 
WritableByteChannelWrapper#channel()}. Ensure that
+   * the underlying channel is cleaned up in {@link 
WritableByteChannelWrapper#close()},
+   * {@link ShuffleMapOutputWriter#commitAllPartitions()}, or
+   * {@link ShuffleMapOutputWriter#abort(Throwable)}.
+   */
+  default WritableByteChannelWrapper openChannelWrapper() throws IOException {
+    return new 
DefaultWritableByteChannelWrapper(Channels.newChannel(openStream()));
+  }
+
+  /**
+   * Returns the number of bytes written either by this writer's output stream 
opened by
+   * {@link #openStream()} or the byte channel opened by {@link 
#openChannelWrapper()}.
+   * <p>
+   * This can be different from the number of bytes given by the caller. For 
example, the
+   * stream might compress or encrypt the bytes before persisting the data to 
the backing
+   * data store.
+   */
+  long getNumBytesWritten();
 
 Review comment:
   But again, do we call `getNumBytesWritten` before or after calling `close` 
on this object? If before, does it include the bytes that might be padded in 
`close`-ing the stream? If after, are we going to be invoking methods on a 
closed resource, and is that reasonable?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to