[ 
https://issues.apache.org/jira/browse/HADOOP-11847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14549697#comment-14549697
 ] 

Yi Liu edited comment on HADOOP-11847 at 5/19/15 3:17 AM:
----------------------------------------------------------

Thanks Kai for the patch.

*In AbstractRawErasureCoder.java*
{code}
+      if (buffers[i] == null) {
+        if (allowNull) {
+          continue;
+        }
+        throw new HadoopIllegalArgumentException("Invalid buffer found, not 
allowing null");
+      }
{code}
Using following code may be more simpler
{code}
if (buffers[i] == null && !allowNull) {
  throw new HadoopIllegalArgumentException("Invalid buffer found, not allowing 
null");
}
{code}


*In AbstractRawErasureDecoder.java*
Rename {{findGoodInput}} to {{getFirstNotNullInput}}, and we can use generic 
type of Java, also the implementation can be simplified: 
{code}
  /**
   * Find the first not null input.
   * @param inputs
   * @return the first not null input
   */
  protected <T> T getFirstNotNullInput(T[] inputs) {
    for (T input : inputs) {
      if (input != null) {
        return input;
      }
    }

    throw new HadoopIllegalArgumentException(
        "Invalid inputs are found, all being null");
  }
{code}
Look at the above, is it more cool? 
Then you can change
{code}
ByteBuffer goodInput = (ByteBuffer) findGoodInput(inputs);
{code} to
{code}
ByteBuffer firstNotNullInput = getFirstNotNullInput(inputs);
{code}

{code}
protected int[] getErasedOrNotToReadIndexes(Object[] inputs) {
    int[] invalidIndexes = new int[inputs.length];
{code}
We can accept the {{int erasedNum}} parameter, then we can allocate the exact 
array size and no need array copy.


*In RSRawDecoder.java*
{code}
/**
   * We need a set of reusable buffers either for the bytes array
   * decoding version or direct buffer decoding version. Normally not both.
   *
   * For both input and output, in addition to the valid buffers from the caller
   * passed from above, we need to provide extra buffers for the internal
   * decoding implementation. For input, the caller should provide at least
   * numDataUnits valid buffers (non-NULL); for output, the caller should 
   * provide no more than numParityUnits but at least one buffers. And the left
   * buffers will be borrowed from either bytesArrayBuffersForInput or 
   * bytesArrayBuffersForOutput, for the bytes array version.
   *
   */
  // Reused buffers for decoding with bytes arrays
  private byte[][] bytesArrayBuffers;
  private byte[][] adjustedByteArrayInputsParameter;
  private byte[][] adjustedByteArrayOutputsParameter;
  private int[] adjustedInputOffsets;
  private int[] adjustedOutputOffsets;

  // Reused buffers for decoding with direct ByteBuffers
  private ByteBuffer[] directBuffers;
  private ByteBuffer[] adjustedDirectBufferInputsParameter;
  private ByteBuffer[] adjustedDirectBufferOutputsParameter;
{code}
I don't think we need these.

{code}
@Override
  protected void doDecode(byte[][] inputs, int[] inputOffsets,
                          int dataLen, int[] erasedIndexes,
                          byte[][] outputs, int[] outputOffsets) {
    ensureBytesArrayBuffers(dataLen);

    /**
     * As passed parameters are friendly to callers but not to the underlying
     * implementations, so we have to adjust them before calling doDecoder.
     */

    int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs);
    int bufferIdx = 0, erasedIdx;

    // Prepare for adjustedInputsParameter and adjustedInputOffsets
    System.arraycopy(inputs, 0, adjustedByteArrayInputsParameter,
        0, inputs.length);
    System.arraycopy(inputOffsets, 0, adjustedInputOffsets,
        0, inputOffsets.length);
    for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
      // Borrow it from bytesArrayBuffersForInput for the temp usage.
      erasedIdx = erasedOrNotToReadIndexes[i];
      adjustedByteArrayInputsParameter[erasedIdx] =
          resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen);
      adjustedInputOffsets[erasedIdx] = 0; // Always 0 for such temp input
    }

    // Prepare for adjustedOutputsParameter
    for (int i = 0; i < adjustedByteArrayOutputsParameter.length; i++) {
      adjustedByteArrayOutputsParameter[i] =
          resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen);
      adjustedOutputOffsets[i] = 0; // Always 0 for such temp output
    }
    for (int outputIdx = 0, i = 0;
         i < erasedIndexes.length; i++, outputIdx++) {
      for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
        // If this index is one requested by the caller via erasedIndexes, then
        // we use the passed output buffer to avoid copying data thereafter.
        if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
          adjustedByteArrayOutputsParameter[j] =
              resetBuffer(outputs[outputIdx], 0, dataLen);
          adjustedOutputOffsets[j] = outputOffsets[outputIdx];
        }
      }
    }

    doDecodeImpl(adjustedByteArrayInputsParameter, adjustedInputOffsets,
        dataLen, erasedOrNotToReadIndexes,
        adjustedByteArrayOutputsParameter, adjustedOutputOffsets);
  }
{code}
I think we can use the inputs/outputs directly, no need for copy, we just need 
to do some modification for {{RSUtil.GF}}. Maybe we can discuss offline.

{code}
TODO: HADOOP-11871
 * currently this implementation will compute and decode not to read
 * units unnecessarily due to the underlying implementation limit in GF.
{code}
It's easy to implement it in this JIRA

*In test*
I'd like to see we have following test (here, I only use 6+3 for example, in 
real tests, you can cover different schema.):
1. all missed chunks are data chunks  (we may have at most 3 chunks missed, 
here we need to test 1 chunk missed, 2 chunks missed, 3 chunks missed)
2. all missed chunks are parity chunks. (same as above)
3. both data and parity chunks are missed. (same as above)



was (Author: hitliuyi):
Thanks Kai for the patch.

*In AbstractRawErasureCoder.java*
{code}
+      if (buffers[i] == null) {
+        if (allowNull) {
+          continue;
+        }
+        throw new HadoopIllegalArgumentException("Invalid buffer found, not 
allowing null");
+      }
{code}
Using following code may be more simpler
{code}
if (buffers[i] == null && !allowNull) {
  throw new HadoopIllegalArgumentException("Invalid buffer found, not allowing 
null");
}
{code}


*In AbstractRawErasureDecoder.java*
Rename {{findGoodInput}} to {{getFirstNullInput}}, and we can use generic type 
of Java, also the implementation can be simplified: 
{code}
  /**
   * Find the first null input.
   * @param inputs
   * @return the first null input
   */
  protected <T> T getFirstNullInput(T[] inputs) {
    for (T input : inputs) {
      if (input != null) {
        return input;
      }
    }

    throw new HadoopIllegalArgumentException(
        "Invalid inputs are found, all being null");
  }
{code}
Look at the above, is it more cool? 
Then you can change
{code}
ByteBuffer goodInput = (ByteBuffer) findGoodInput(inputs);
{code} to
{code}
ByteBuffer firstNullInput = getFirstNullInput(inputs);
{code}

{code}
protected int[] getErasedOrNotToReadIndexes(Object[] inputs) {
    int[] invalidIndexes = new int[inputs.length];
{code}
We can accept the {{int erasedNum}} parameter, then we can allocate the exact 
array size and no need array copy.


*In RSRawDecoder.java*
{code}
/**
   * We need a set of reusable buffers either for the bytes array
   * decoding version or direct buffer decoding version. Normally not both.
   *
   * For both input and output, in addition to the valid buffers from the caller
   * passed from above, we need to provide extra buffers for the internal
   * decoding implementation. For input, the caller should provide at least
   * numDataUnits valid buffers (non-NULL); for output, the caller should 
   * provide no more than numParityUnits but at least one buffers. And the left
   * buffers will be borrowed from either bytesArrayBuffersForInput or 
   * bytesArrayBuffersForOutput, for the bytes array version.
   *
   */
  // Reused buffers for decoding with bytes arrays
  private byte[][] bytesArrayBuffers;
  private byte[][] adjustedByteArrayInputsParameter;
  private byte[][] adjustedByteArrayOutputsParameter;
  private int[] adjustedInputOffsets;
  private int[] adjustedOutputOffsets;

  // Reused buffers for decoding with direct ByteBuffers
  private ByteBuffer[] directBuffers;
  private ByteBuffer[] adjustedDirectBufferInputsParameter;
  private ByteBuffer[] adjustedDirectBufferOutputsParameter;
{code}
I don't think we need these.

{code}
@Override
  protected void doDecode(byte[][] inputs, int[] inputOffsets,
                          int dataLen, int[] erasedIndexes,
                          byte[][] outputs, int[] outputOffsets) {
    ensureBytesArrayBuffers(dataLen);

    /**
     * As passed parameters are friendly to callers but not to the underlying
     * implementations, so we have to adjust them before calling doDecoder.
     */

    int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs);
    int bufferIdx = 0, erasedIdx;

    // Prepare for adjustedInputsParameter and adjustedInputOffsets
    System.arraycopy(inputs, 0, adjustedByteArrayInputsParameter,
        0, inputs.length);
    System.arraycopy(inputOffsets, 0, adjustedInputOffsets,
        0, inputOffsets.length);
    for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
      // Borrow it from bytesArrayBuffersForInput for the temp usage.
      erasedIdx = erasedOrNotToReadIndexes[i];
      adjustedByteArrayInputsParameter[erasedIdx] =
          resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen);
      adjustedInputOffsets[erasedIdx] = 0; // Always 0 for such temp input
    }

    // Prepare for adjustedOutputsParameter
    for (int i = 0; i < adjustedByteArrayOutputsParameter.length; i++) {
      adjustedByteArrayOutputsParameter[i] =
          resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen);
      adjustedOutputOffsets[i] = 0; // Always 0 for such temp output
    }
    for (int outputIdx = 0, i = 0;
         i < erasedIndexes.length; i++, outputIdx++) {
      for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
        // If this index is one requested by the caller via erasedIndexes, then
        // we use the passed output buffer to avoid copying data thereafter.
        if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
          adjustedByteArrayOutputsParameter[j] =
              resetBuffer(outputs[outputIdx], 0, dataLen);
          adjustedOutputOffsets[j] = outputOffsets[outputIdx];
        }
      }
    }

    doDecodeImpl(adjustedByteArrayInputsParameter, adjustedInputOffsets,
        dataLen, erasedOrNotToReadIndexes,
        adjustedByteArrayOutputsParameter, adjustedOutputOffsets);
  }
{code}
I think we can use the inputs/outputs directly, no need for copy, we just need 
to do some modification for {{RSUtil.GF}}. Maybe we can discuss offline.

{code}
TODO: HADOOP-11871
 * currently this implementation will compute and decode not to read
 * units unnecessarily due to the underlying implementation limit in GF.
{code}
It's easy to implement it in this JIRA

*In test*
I'd like to see we have following test (here, I only use 6+3 for example, in 
real tests, you can cover different schema.):
1. all missed chunks are data chunks  (we may have at most 3 chunks missed, 
here we need to test 1 chunk missed, 2 chunks missed, 3 chunks missed)
2. all missed chunks are parity chunks. (same as above)
3. both data and parity chunks are missed. (same as above)


> Enhance raw coder allowing to read least required inputs in decoding
> --------------------------------------------------------------------
>
>                 Key: HADOOP-11847
>                 URL: https://issues.apache.org/jira/browse/HADOOP-11847
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: io
>            Reporter: Kai Zheng
>            Assignee: Kai Zheng
>              Labels: BB2015-05-TBR
>         Attachments: HADOOP-11847-HDFS-7285-v3.patch, 
> HADOOP-11847-HDFS-7285-v4.patch, HADOOP-11847-HDFS-7285-v5.patch, 
> HADOOP-11847-v1.patch, HADOOP-11847-v2.patch
>
>
> This is to enhance raw erasure coder to allow only reading least required 
> inputs while decoding. It will also refine and document the relevant APIs for 
> better understanding and usage. When using least required inputs, it may add 
> computating overhead but will possiblly outperform overall since less network 
> traffic and disk IO are involved.
> This is something planned to do but just got reminded by [~zhz]' s question 
> raised in HDFS-7678, also copied here:
> bq.Kai Zheng I have a question about decoding: in a (6+3) schema, if block #2 
> is missing, and I want to repair it with blocks 0, 1, 3, 4, 5, 8, how should 
> I construct the inputs to RawErasureDecoder#decode?
> With this work, hopefully the answer to above question would be obvious.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to