[ https://issues.apache.org/jira/browse/HADOOP-11847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14549697#comment-14549697 ]
Yi Liu edited comment on HADOOP-11847 at 5/19/15 3:17 AM: ---------------------------------------------------------- Thanks Kai for the patch. *In AbstractRawErasureCoder.java* {code} + if (buffers[i] == null) { + if (allowNull) { + continue; + } + throw new HadoopIllegalArgumentException("Invalid buffer found, not allowing null"); + } {code} Using following code may be more simpler {code} if (buffers[i] == null && !allowNull) { throw new HadoopIllegalArgumentException("Invalid buffer found, not allowing null"); } {code} *In AbstractRawErasureDecoder.java* Rename {{findGoodInput}} to {{getFirstNotNullInput}}, and we can use generic type of Java, also the implementation can be simplified: {code} /** * Find the first not null input. * @param inputs * @return the first not null input */ protected <T> T getFirstNotNullInput(T[] inputs) { for (T input : inputs) { if (input != null) { return input; } } throw new HadoopIllegalArgumentException( "Invalid inputs are found, all being null"); } {code} Look at the above, is it more cool? Then you can change {code} ByteBuffer goodInput = (ByteBuffer) findGoodInput(inputs); {code} to {code} ByteBuffer firstNotNullInput = getFirstNotNullInput(inputs); {code} {code} protected int[] getErasedOrNotToReadIndexes(Object[] inputs) { int[] invalidIndexes = new int[inputs.length]; {code} We can accept the {{int erasedNum}} parameter, then we can allocate the exact array size and no need array copy. *In RSRawDecoder.java* {code} /** * We need a set of reusable buffers either for the bytes array * decoding version or direct buffer decoding version. Normally not both. * * For both input and output, in addition to the valid buffers from the caller * passed from above, we need to provide extra buffers for the internal * decoding implementation. For input, the caller should provide at least * numDataUnits valid buffers (non-NULL); for output, the caller should * provide no more than numParityUnits but at least one buffers. And the left * buffers will be borrowed from either bytesArrayBuffersForInput or * bytesArrayBuffersForOutput, for the bytes array version. * */ // Reused buffers for decoding with bytes arrays private byte[][] bytesArrayBuffers; private byte[][] adjustedByteArrayInputsParameter; private byte[][] adjustedByteArrayOutputsParameter; private int[] adjustedInputOffsets; private int[] adjustedOutputOffsets; // Reused buffers for decoding with direct ByteBuffers private ByteBuffer[] directBuffers; private ByteBuffer[] adjustedDirectBufferInputsParameter; private ByteBuffer[] adjustedDirectBufferOutputsParameter; {code} I don't think we need these. {code} @Override protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen, int[] erasedIndexes, byte[][] outputs, int[] outputOffsets) { ensureBytesArrayBuffers(dataLen); /** * As passed parameters are friendly to callers but not to the underlying * implementations, so we have to adjust them before calling doDecoder. */ int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs); int bufferIdx = 0, erasedIdx; // Prepare for adjustedInputsParameter and adjustedInputOffsets System.arraycopy(inputs, 0, adjustedByteArrayInputsParameter, 0, inputs.length); System.arraycopy(inputOffsets, 0, adjustedInputOffsets, 0, inputOffsets.length); for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) { // Borrow it from bytesArrayBuffersForInput for the temp usage. erasedIdx = erasedOrNotToReadIndexes[i]; adjustedByteArrayInputsParameter[erasedIdx] = resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen); adjustedInputOffsets[erasedIdx] = 0; // Always 0 for such temp input } // Prepare for adjustedOutputsParameter for (int i = 0; i < adjustedByteArrayOutputsParameter.length; i++) { adjustedByteArrayOutputsParameter[i] = resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen); adjustedOutputOffsets[i] = 0; // Always 0 for such temp output } for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++, outputIdx++) { for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) { // If this index is one requested by the caller via erasedIndexes, then // we use the passed output buffer to avoid copying data thereafter. if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) { adjustedByteArrayOutputsParameter[j] = resetBuffer(outputs[outputIdx], 0, dataLen); adjustedOutputOffsets[j] = outputOffsets[outputIdx]; } } } doDecodeImpl(adjustedByteArrayInputsParameter, adjustedInputOffsets, dataLen, erasedOrNotToReadIndexes, adjustedByteArrayOutputsParameter, adjustedOutputOffsets); } {code} I think we can use the inputs/outputs directly, no need for copy, we just need to do some modification for {{RSUtil.GF}}. Maybe we can discuss offline. {code} TODO: HADOOP-11871 * currently this implementation will compute and decode not to read * units unnecessarily due to the underlying implementation limit in GF. {code} It's easy to implement it in this JIRA *In test* I'd like to see we have following test (here, I only use 6+3 for example, in real tests, you can cover different schema.): 1. all missed chunks are data chunks (we may have at most 3 chunks missed, here we need to test 1 chunk missed, 2 chunks missed, 3 chunks missed) 2. all missed chunks are parity chunks. (same as above) 3. both data and parity chunks are missed. (same as above) was (Author: hitliuyi): Thanks Kai for the patch. *In AbstractRawErasureCoder.java* {code} + if (buffers[i] == null) { + if (allowNull) { + continue; + } + throw new HadoopIllegalArgumentException("Invalid buffer found, not allowing null"); + } {code} Using following code may be more simpler {code} if (buffers[i] == null && !allowNull) { throw new HadoopIllegalArgumentException("Invalid buffer found, not allowing null"); } {code} *In AbstractRawErasureDecoder.java* Rename {{findGoodInput}} to {{getFirstNullInput}}, and we can use generic type of Java, also the implementation can be simplified: {code} /** * Find the first null input. * @param inputs * @return the first null input */ protected <T> T getFirstNullInput(T[] inputs) { for (T input : inputs) { if (input != null) { return input; } } throw new HadoopIllegalArgumentException( "Invalid inputs are found, all being null"); } {code} Look at the above, is it more cool? Then you can change {code} ByteBuffer goodInput = (ByteBuffer) findGoodInput(inputs); {code} to {code} ByteBuffer firstNullInput = getFirstNullInput(inputs); {code} {code} protected int[] getErasedOrNotToReadIndexes(Object[] inputs) { int[] invalidIndexes = new int[inputs.length]; {code} We can accept the {{int erasedNum}} parameter, then we can allocate the exact array size and no need array copy. *In RSRawDecoder.java* {code} /** * We need a set of reusable buffers either for the bytes array * decoding version or direct buffer decoding version. Normally not both. * * For both input and output, in addition to the valid buffers from the caller * passed from above, we need to provide extra buffers for the internal * decoding implementation. For input, the caller should provide at least * numDataUnits valid buffers (non-NULL); for output, the caller should * provide no more than numParityUnits but at least one buffers. And the left * buffers will be borrowed from either bytesArrayBuffersForInput or * bytesArrayBuffersForOutput, for the bytes array version. * */ // Reused buffers for decoding with bytes arrays private byte[][] bytesArrayBuffers; private byte[][] adjustedByteArrayInputsParameter; private byte[][] adjustedByteArrayOutputsParameter; private int[] adjustedInputOffsets; private int[] adjustedOutputOffsets; // Reused buffers for decoding with direct ByteBuffers private ByteBuffer[] directBuffers; private ByteBuffer[] adjustedDirectBufferInputsParameter; private ByteBuffer[] adjustedDirectBufferOutputsParameter; {code} I don't think we need these. {code} @Override protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen, int[] erasedIndexes, byte[][] outputs, int[] outputOffsets) { ensureBytesArrayBuffers(dataLen); /** * As passed parameters are friendly to callers but not to the underlying * implementations, so we have to adjust them before calling doDecoder. */ int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs); int bufferIdx = 0, erasedIdx; // Prepare for adjustedInputsParameter and adjustedInputOffsets System.arraycopy(inputs, 0, adjustedByteArrayInputsParameter, 0, inputs.length); System.arraycopy(inputOffsets, 0, adjustedInputOffsets, 0, inputOffsets.length); for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) { // Borrow it from bytesArrayBuffersForInput for the temp usage. erasedIdx = erasedOrNotToReadIndexes[i]; adjustedByteArrayInputsParameter[erasedIdx] = resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen); adjustedInputOffsets[erasedIdx] = 0; // Always 0 for such temp input } // Prepare for adjustedOutputsParameter for (int i = 0; i < adjustedByteArrayOutputsParameter.length; i++) { adjustedByteArrayOutputsParameter[i] = resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen); adjustedOutputOffsets[i] = 0; // Always 0 for such temp output } for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++, outputIdx++) { for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) { // If this index is one requested by the caller via erasedIndexes, then // we use the passed output buffer to avoid copying data thereafter. if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) { adjustedByteArrayOutputsParameter[j] = resetBuffer(outputs[outputIdx], 0, dataLen); adjustedOutputOffsets[j] = outputOffsets[outputIdx]; } } } doDecodeImpl(adjustedByteArrayInputsParameter, adjustedInputOffsets, dataLen, erasedOrNotToReadIndexes, adjustedByteArrayOutputsParameter, adjustedOutputOffsets); } {code} I think we can use the inputs/outputs directly, no need for copy, we just need to do some modification for {{RSUtil.GF}}. Maybe we can discuss offline. {code} TODO: HADOOP-11871 * currently this implementation will compute and decode not to read * units unnecessarily due to the underlying implementation limit in GF. {code} It's easy to implement it in this JIRA *In test* I'd like to see we have following test (here, I only use 6+3 for example, in real tests, you can cover different schema.): 1. all missed chunks are data chunks (we may have at most 3 chunks missed, here we need to test 1 chunk missed, 2 chunks missed, 3 chunks missed) 2. all missed chunks are parity chunks. (same as above) 3. both data and parity chunks are missed. (same as above) > Enhance raw coder allowing to read least required inputs in decoding > -------------------------------------------------------------------- > > Key: HADOOP-11847 > URL: https://issues.apache.org/jira/browse/HADOOP-11847 > Project: Hadoop Common > Issue Type: Sub-task > Components: io > Reporter: Kai Zheng > Assignee: Kai Zheng > Labels: BB2015-05-TBR > Attachments: HADOOP-11847-HDFS-7285-v3.patch, > HADOOP-11847-HDFS-7285-v4.patch, HADOOP-11847-HDFS-7285-v5.patch, > HADOOP-11847-v1.patch, HADOOP-11847-v2.patch > > > This is to enhance raw erasure coder to allow only reading least required > inputs while decoding. It will also refine and document the relevant APIs for > better understanding and usage. When using least required inputs, it may add > computating overhead but will possiblly outperform overall since less network > traffic and disk IO are involved. > This is something planned to do but just got reminded by [~zhz]' s question > raised in HDFS-7678, also copied here: > bq.Kai Zheng I have a question about decoding: in a (6+3) schema, if block #2 > is missing, and I want to repair it with blocks 0, 1, 3, 4, 5, 8, how should > I construct the inputs to RawErasureDecoder#decode? > With this work, hopefully the answer to above question would be obvious. -- This message was sent by Atlassian JIRA (v6.3.4#6332)