This is an automated email from the ASF dual-hosted git repository. shashikant pushed a commit to branch ozone-0.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/ozone-0.4 by this push: new 4373c12 HDDS-1173. Fix a data corruption bug in BlockOutputStream. Contributed by Shashikant Banerjee. 4373c12 is described below commit 4373c127013c114ffec7f99262f2565009fa46b8 Author: Shashikant Banerjee <shashik...@apache.org> AuthorDate: Mon Mar 11 23:15:49 2019 +0530 HDDS-1173. Fix a data corruption bug in BlockOutputStream. Contributed by Shashikant Banerjee. (cherry picked from commit b4aa24d3c5ad1b9309a58795e4b48e567695c4e4) --- .../hadoop/hdds/scm/storage/BlockOutputStream.java | 232 +++++++++------------ .../apache/hadoop/hdds/scm/storage/BufferPool.java | 106 ++++++++++ .../ozone/client/io/BlockOutputStreamEntry.java | 23 +- .../hadoop/ozone/client/io/KeyOutputStream.java | 21 +- .../rpc/TestCloseContainerHandlingByClient.java | 62 ++++-- .../commandhandler/TestBlockDeletion.java | 2 +- 6 files changed, 279 insertions(+), 167 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 2e156b3..fe41f57 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; -import java.nio.Buffer; import java.nio.ByteBuffer; import java.util.Collections; import java.util.UUID; @@ -87,7 +86,7 @@ public class BlockOutputStream extends OutputStream { private final long streamBufferFlushSize; private final long streamBufferMaxSize; private final long watchTimeout; - private List<ByteBuffer> bufferList; + private BufferPool bufferPool; // The IOException will be set by response handling thread in case there is an // exception received in the response. If the exception is set, the next // request will fail upfront. @@ -111,8 +110,6 @@ public class BlockOutputStream extends OutputStream { // map containing mapping for putBlock logIndex to to flushedDataLength Map. private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap; - private int currentBufferIndex; - private List<DatanodeDetails> failedServers; /** @@ -124,7 +121,7 @@ public class BlockOutputStream extends OutputStream { * @param pipeline pipeline where block will be written * @param traceID container protocol call args * @param chunkSize chunk size - * @param bufferList list of byte buffers + * @param bufferPool pool of buffers * @param streamBufferFlushSize flush size * @param streamBufferMaxSize max size of the currentBuffer * @param watchTimeout watch timeout @@ -135,7 +132,7 @@ public class BlockOutputStream extends OutputStream { public BlockOutputStream(BlockID blockID, String key, XceiverClientManager xceiverClientManager, Pipeline pipeline, String traceID, int chunkSize, long streamBufferFlushSize, - long streamBufferMaxSize, long watchTimeout, List<ByteBuffer> bufferList, + long streamBufferMaxSize, long watchTimeout, BufferPool bufferPool, ChecksumType checksumType, int bytesPerChecksum) throws IOException { this.blockID = blockID; @@ -154,7 +151,7 @@ public class BlockOutputStream extends OutputStream { this.streamBufferFlushSize = streamBufferFlushSize; this.streamBufferMaxSize = streamBufferMaxSize; this.watchTimeout = watchTimeout; - this.bufferList = bufferList; + this.bufferPool = bufferPool; this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; @@ -164,7 +161,6 @@ public class BlockOutputStream extends OutputStream { totalAckDataLength = 0; futureMap = new ConcurrentHashMap<>(); totalDataFlushedLength = 0; - currentBufferIndex = 0; writtenDataLength = 0; failedServers = Collections.emptyList(); } @@ -181,13 +177,6 @@ public class BlockOutputStream extends OutputStream { return writtenDataLength; } - private long computeBufferData() { - int dataLength = - bufferList.stream().mapToInt(Buffer::position).sum(); - Preconditions.checkState(dataLength <= streamBufferMaxSize); - return dataLength; - } - public List<DatanodeDetails> getFailedServers() { return failedServers; } @@ -202,6 +191,7 @@ public class BlockOutputStream extends OutputStream { @Override public void write(byte[] b, int off, int len) throws IOException { + checkOpen(); if (b == null) { throw new NullPointerException(); } @@ -213,53 +203,40 @@ public class BlockOutputStream extends OutputStream { return; } while (len > 0) { - checkOpen(); int writeLen; - allocateBuffer(); - ByteBuffer currentBuffer = getCurrentBuffer(); + + // Allocate a buffer if needed. The buffer will be allocated only + // once as needed and will be reused again for mutiple blockOutputStream + // entries. + ByteBuffer currentBuffer = bufferPool.allocateBufferIfNeeded(); + int pos = currentBuffer.position(); writeLen = - Math.min(chunkSize - currentBuffer.position() % chunkSize, len); + Math.min(chunkSize - pos % chunkSize, len); currentBuffer.put(b, off, writeLen); - if (currentBuffer.position() % chunkSize == 0) { - int pos = currentBuffer.position() - chunkSize; - int limit = currentBuffer.position(); - writeChunk(pos, limit); + if (!currentBuffer.hasRemaining()) { + writeChunk(currentBuffer); } off += writeLen; len -= writeLen; writtenDataLength += writeLen; - if (currentBuffer.position() == streamBufferFlushSize) { + if (shouldFlush()) { totalDataFlushedLength += streamBufferFlushSize; handlePartialFlush(); } - long bufferedData = computeBufferData(); - // Data in the bufferList can not exceed streamBufferMaxSize - if (bufferedData == streamBufferMaxSize) { + // Data in the bufferPool can not exceed streamBufferMaxSize + if (isBufferPoolFull()) { handleFullBuffer(); } } } - private ByteBuffer getCurrentBuffer() { - ByteBuffer buffer = bufferList.get(currentBufferIndex); - if (!buffer.hasRemaining()) { - currentBufferIndex = - currentBufferIndex < getMaxNumBuffers() - 1 ? ++currentBufferIndex : - 0; - } - return bufferList.get(currentBufferIndex); - } - - private int getMaxNumBuffers() { - return (int)(streamBufferMaxSize/streamBufferFlushSize); + private boolean shouldFlush() { + return writtenDataLength % streamBufferFlushSize == 0; } - private void allocateBuffer() { - for (int i = bufferList.size(); i < getMaxNumBuffers(); i++) { - bufferList.add(ByteBuffer.allocate((int)streamBufferFlushSize)); - } + private boolean isBufferPoolFull() { + return bufferPool.computeBufferData() == streamBufferMaxSize; } - /** * Will be called on the retryPath in case closedContainerException/ * TimeoutException. @@ -272,36 +249,37 @@ public class BlockOutputStream extends OutputStream { if (len == 0) { return; } - int off = 0; - int pos = off; + int count = 0; + Preconditions.checkArgument(len <= streamBufferMaxSize); while (len > 0) { long writeLen; writeLen = Math.min(chunkSize, len); if (writeLen == chunkSize) { - int limit = pos + chunkSize; - writeChunk(pos, limit); + writeChunk(bufferPool.getBuffer(count)); } - off += writeLen; len -= writeLen; + count++; writtenDataLength += writeLen; - if (off % streamBufferFlushSize == 0) { - // reset the position to zero as now we wll readng thhe next buffer in - // the list - pos = 0; + if (shouldFlush()) { + // reset the position to zero as now we will be reading the + // next buffer in the list totalDataFlushedLength += streamBufferFlushSize; handlePartialFlush(); } - if (computeBufferData() % streamBufferMaxSize == 0) { + + // we should not call isBufferFull here. The buffer might already be full + // as whole data is already cached in the buffer. We should just validate + // if we wrote data of size streamBufferMaxSize to call for handling + // full buffer condition. + if (writtenDataLength == streamBufferMaxSize) { handleFullBuffer(); } } } /** - * just update the totalAckDataLength. Since we have allocated - * the currentBuffer more than the streamBufferMaxSize, we can keep on writing - * to the currentBuffer. In case of failure, we will read the data starting - * from totalAckDataLength. + * just update the totalAckDataLength. In case of failure, + * we will read the data starting from totalAckDataLength. */ private void updateFlushIndex(long index) { if (!commitIndex2flushedDataMap.isEmpty()) { @@ -310,13 +288,15 @@ public class BlockOutputStream extends OutputStream { LOG.debug("Total data successfully replicated: " + totalAckDataLength); futureMap.remove(totalAckDataLength); // Flush has been committed to required servers successful. - // just swap the bufferList head and tail after clearing. - ByteBuffer currentBuffer = bufferList.remove(0); - currentBuffer.clear(); - if (currentBufferIndex != 0) { - currentBufferIndex--; + // just release the current buffer from the buffer pool. + + // every entry removed from the putBlock future Map signifies + // streamBufferFlushSize/chunkSize no of chunks successfully committed. + // Release the buffers from the buffer pool to be reused again. + int chunkCount = (int) (streamBufferFlushSize / chunkSize); + for (int i = 0; i < chunkCount; i++) { + bufferPool.releaseBuffer(); } - bufferList.add(currentBuffer); } } @@ -450,91 +430,85 @@ public class BlockOutputStream extends OutputStream { @Override public void flush() throws IOException { if (xceiverClientManager != null && xceiverClient != null - && bufferList != null) { - checkOpen(); - int bufferSize = bufferList.size(); - if (bufferSize > 0) { - try { - // flush the last chunk data residing on the currentBuffer - if (totalDataFlushedLength < writtenDataLength) { - ByteBuffer currentBuffer = getCurrentBuffer(); - int pos = currentBuffer.position() - (currentBuffer.position() - % chunkSize); - int limit = currentBuffer.position() - pos; - writeChunk(pos, currentBuffer.position()); - totalDataFlushedLength += limit; - handlePartialFlush(); - } - waitOnFlushFutures(); - // just check again if the exception is hit while waiting for the - // futures to ensure flush has indeed succeeded - checkOpen(); - } catch (InterruptedException | ExecutionException e) { - adjustBuffersOnException(); - throw new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); - } + && bufferPool != null && bufferPool.getSize() > 0) { + try { + handleFlush(); + } catch (InterruptedException | ExecutionException e) { + adjustBuffersOnException(); + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); } } } - private void writeChunk(int pos, int limit) throws IOException { + + private void writeChunk(ByteBuffer buffer) + throws IOException { // Please note : We are not flipping the slice when we write since // the slices are pointing the currentBuffer start and end as needed for // the chunk write. Also please note, Duplicate does not create a // copy of data, it only creates metadata that points to the data // stream. - ByteBuffer chunk = bufferList.get(currentBufferIndex).duplicate(); - chunk.position(pos); - chunk.limit(limit); + ByteBuffer chunk = buffer.duplicate(); + chunk.position(0); + chunk.limit(buffer.position()); writeChunkToContainer(chunk); } + private void handleFlush() + throws IOException, InterruptedException, ExecutionException { + checkOpen(); + // flush the last chunk data residing on the currentBuffer + if (totalDataFlushedLength < writtenDataLength) { + ByteBuffer currentBuffer = bufferPool.getBuffer(); + int pos = currentBuffer.position(); + writeChunk(currentBuffer); + totalDataFlushedLength += pos; + handlePartialFlush(); + } + waitOnFlushFutures(); + // just check again if the exception is hit while waiting for the + // futures to ensure flush has indeed succeeded + + // irrespective of whether the commitIndex2flushedDataMap is empty + // or not, ensure there is no exception set + checkOpen(); + + } + @Override public void close() throws IOException { if (xceiverClientManager != null && xceiverClient != null - && bufferList != null) { - int bufferSize = bufferList.size(); - if (bufferSize > 0) { - try { - // flush the last chunk data residing on the currentBuffer - if (totalDataFlushedLength < writtenDataLength) { - ByteBuffer currentBuffer = getCurrentBuffer(); - int pos = currentBuffer.position() - (currentBuffer.position() - % chunkSize); - int limit = currentBuffer.position() - pos; - writeChunk(pos, currentBuffer.position()); - totalDataFlushedLength += limit; - handlePartialFlush(); - } - waitOnFlushFutures(); - // irrespective of whether the commitIndex2flushedDataMap is empty - // or not, ensure there is no exception set - checkOpen(); - if (!commitIndex2flushedDataMap.isEmpty()) { - // wait for the last commit index in the commitIndex2flushedDataMap - // to get committed to all or majority of nodes in case timeout - // happens. - long lastIndex = - commitIndex2flushedDataMap.keySet().stream() - .mapToLong(v -> v).max().getAsLong(); - LOG.debug( - "waiting for last flush Index " + lastIndex + " to catch up"); - watchForCommit(lastIndex); - } - } catch (InterruptedException | ExecutionException e) { - adjustBuffersOnException(); - throw new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); - } finally { - cleanup(false); + && bufferPool != null && bufferPool.getSize() > 0) { + try { + handleFlush(); + if (!commitIndex2flushedDataMap.isEmpty()) { + // wait for the last commit index in the commitIndex2flushedDataMap + // to get committed to all or majority of nodes in case timeout + // happens. + long lastIndex = + commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v) + .max().getAsLong(); + LOG.debug( + "waiting for last flush Index " + lastIndex + " to catch up"); + watchForCommit(lastIndex); } + } catch (InterruptedException | ExecutionException e) { + adjustBuffersOnException(); + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } finally { + cleanup(false); } - // clear the currentBuffer - bufferList.stream().forEach(ByteBuffer::clear); + // TODO: Turn the below buffer empty check on whne Standalone pipeline + // is removed in the write path in tests + // Preconditions.checkArgument(buffer.position() == 0); + // bufferPool.checkBufferPoolEmpty(); + } } + private void waitOnFlushFutures() throws InterruptedException, ExecutionException { CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java new file mode 100644 index 0000000..541e6bd --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import com.google.common.base.Preconditions; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * This class creates and manages pool of n buffers. + */ +public class BufferPool { + + private List<ByteBuffer> bufferList; + private int currentBufferIndex; + private final int bufferSize; + private final int capacity; + + public BufferPool(int bufferSize, int capacity) { + this.capacity = capacity; + this.bufferSize = bufferSize; + bufferList = new ArrayList<>(capacity); + currentBufferIndex = -1; + } + + public ByteBuffer getBuffer() { + return currentBufferIndex == -1 ? null : bufferList.get(currentBufferIndex); + } + + /** + * If the currentBufferIndex is less than the buffer size - 1, + * it means, the next buffer in the list has been freed up for + * rewriting. Reuse the next available buffer in such cases. + * + * In case, the currentBufferIndex == buffer.size and buffer size is still + * less than the capacity to be allocated, just allocate a buffer of size + * chunk size. + * + */ + public ByteBuffer allocateBufferIfNeeded() { + ByteBuffer buffer = getBuffer(); + if (buffer != null && buffer.hasRemaining()) { + return buffer; + } + if (currentBufferIndex < bufferList.size() - 1) { + buffer = getBuffer(currentBufferIndex + 1); + } else { + buffer = ByteBuffer.allocate(bufferSize); + bufferList.add(buffer); + } + Preconditions.checkArgument(bufferList.size() <= capacity); + currentBufferIndex++; + // TODO: Turn the below precondition check on when Standalone pipeline + // is removed in the write path in tests + // Preconditions.checkArgument(buffer.position() == 0); + return buffer; + } + + public void releaseBuffer() { + // always remove from head of the list and append at last + ByteBuffer buffer = bufferList.remove(0); + buffer.clear(); + bufferList.add(buffer); + currentBufferIndex--; + } + + public void clearBufferPool() { + bufferList.clear(); + currentBufferIndex = -1; + } + + public void checkBufferPoolEmpty() { + Preconditions.checkArgument(computeBufferData() == 0); + } + public long computeBufferData() { + return bufferList.stream().mapToInt(value -> value.position()) + .sum(); + } + + public int getSize() { + return bufferList.size(); + } + + ByteBuffer getBuffer(int index) { + return bufferList.get(index); + } + +} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 16a825f..b6de8ab 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -19,8 +19,6 @@ package org.apache.hadoop.ozone.client.io; import java.io.IOException; import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.List; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -29,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -58,14 +57,14 @@ public final class BlockOutputStreamEntry extends OutputStream { private final long streamBufferFlushSize; private final long streamBufferMaxSize; private final long watchTimeout; - private List<ByteBuffer> bufferList; + private BufferPool bufferPool; @SuppressWarnings("parameternumber") private BlockOutputStreamEntry(BlockID blockID, String key, XceiverClientManager xceiverClientManager, Pipeline pipeline, String requestId, int chunkSize, long length, long streamBufferFlushSize, long streamBufferMaxSize, - long watchTimeout, List<ByteBuffer> bufferList, + long watchTimeout, BufferPool bufferPool, ChecksumType checksumType, int bytesPerChecksum, Token<OzoneBlockTokenIdentifier> token) { this.outputStream = null; @@ -81,7 +80,7 @@ public final class BlockOutputStreamEntry extends OutputStream { this.streamBufferFlushSize = streamBufferFlushSize; this.streamBufferMaxSize = streamBufferMaxSize; this.watchTimeout = watchTimeout; - this.bufferList = bufferList; + this.bufferPool = bufferPool; this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; } @@ -112,7 +111,7 @@ public final class BlockOutputStreamEntry extends OutputStream { this.outputStream = new BlockOutputStream(blockID, key, xceiverClientManager, pipeline, requestId, chunkSize, streamBufferFlushSize, - streamBufferMaxSize, watchTimeout, bufferList, checksumType, + streamBufferMaxSize, watchTimeout, bufferPool, checksumType, bytesPerChecksum); } } @@ -212,7 +211,7 @@ public final class BlockOutputStreamEntry extends OutputStream { private long streamBufferFlushSize; private long streamBufferMaxSize; private long watchTimeout; - private List<ByteBuffer> bufferList; + private BufferPool bufferPool; private Token<OzoneBlockTokenIdentifier> token; private ChecksumType checksumType; private int bytesPerChecksum; @@ -278,8 +277,8 @@ public final class BlockOutputStreamEntry extends OutputStream { return this; } - public Builder setBufferList(List<ByteBuffer> bffrLst) { - this.bufferList = bffrLst; + public Builder setbufferPool(BufferPool pool) { + this.bufferPool = pool; return this; } @@ -292,7 +291,7 @@ public final class BlockOutputStreamEntry extends OutputStream { return new BlockOutputStreamEntry(blockID, key, xceiverClientManager, pipeline, requestId, chunkSize, length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout, - bufferList, checksumType, bytesPerChecksum, token); + bufferPool, checksumType, bytesPerChecksum, token); } } @@ -340,8 +339,8 @@ public final class BlockOutputStreamEntry extends OutputStream { return watchTimeout; } - public List<ByteBuffer> getBufferList() { - return bufferList; + public BufferPool getBufferPool() { + return bufferPool; } public void setCurrentPosition(long curPosition) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 7035c73..d1acbe1 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -45,7 +46,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Collection; @@ -83,7 +83,7 @@ public class KeyOutputStream extends OutputStream { private final long blockSize; private final int bytesPerChecksum; private final ChecksumType checksumType; - private List<ByteBuffer> bufferList; + private final BufferPool bufferPool; private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private FileEncryptionInfo feInfo; private ExcludeList excludeList; @@ -104,9 +104,7 @@ public class KeyOutputStream extends OutputStream { closed = false; streamBufferFlushSize = 0; streamBufferMaxSize = 0; - bufferList = new ArrayList<>(1); - ByteBuffer buffer = ByteBuffer.allocate(1); - bufferList.add(buffer); + bufferPool = new BufferPool(chunkSize, 1); watchTimeout = 0; blockSize = 0; this.checksumType = ChecksumType.valueOf( @@ -182,7 +180,8 @@ public class KeyOutputStream extends OutputStream { Preconditions.checkState(streamBufferFlushSize % chunkSize == 0); Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); Preconditions.checkState(blockSize % streamBufferMaxSize == 0); - this.bufferList = new ArrayList<>(); + this.bufferPool = + new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize); this.excludeList = new ExcludeList(); } @@ -228,7 +227,7 @@ public class KeyOutputStream extends OutputStream { .setStreamBufferFlushSize(streamBufferFlushSize) .setStreamBufferMaxSize(streamBufferMaxSize) .setWatchTimeout(watchTimeout) - .setBufferList(bufferList) + .setbufferPool(bufferPool) .setChecksumType(checksumType) .setBytesPerChecksum(bytesPerChecksum) .setToken(subKeyInfo.getToken()); @@ -272,8 +271,7 @@ public class KeyOutputStream extends OutputStream { } private long computeBufferData() { - return bufferList.stream().mapToInt(value -> value.position()) - .sum(); + return bufferPool.computeBufferData(); } private void handleWrite(byte[] b, int off, long len, boolean retry) @@ -580,10 +578,7 @@ public class KeyOutputStream extends OutputStream { } catch (IOException ioe) { throw ioe; } finally { - if (bufferList != null) { - bufferList.stream().forEach(e -> e.clear()); - } - bufferList = null; + bufferPool.clearBufferPool(); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index bf4f4d4..396351d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -87,6 +87,7 @@ public class TestCloseContainerHandlingByClient { conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); @@ -133,7 +134,7 @@ public class TestCloseContainerHandlingByClient { .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key); + waitForContainerClose(key); key.write(data); key.flush(); key.close(); @@ -166,7 +167,7 @@ public class TestCloseContainerHandlingByClient { .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key); + waitForContainerClose(key); key.close(); // read the key from OM again and match the length.The length will still // be the equal to the original data size. @@ -180,12 +181,12 @@ public class TestCloseContainerHandlingByClient { String keyName = getKeyName(); OzoneOutputStream key = - createKey(keyName, ReplicationType.RATIS, (4 * blockSize)); + createKey(keyName, ReplicationType.RATIS, (3 * blockSize)); KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); // With the initial size provided, it should have preallocated 4 blocks - Assert.assertEquals(4, keyOutputStream.getStreamEntries().size()); - // write data more than 1 chunk + Assert.assertEquals(3, keyOutputStream.getStreamEntries().size()); + // write data more than 1 block byte[] data = ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize)) .getBytes(UTF_8); @@ -199,7 +200,7 @@ public class TestCloseContainerHandlingByClient { .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key); + waitForContainerClose(key); // write 1 more block worth of data. It will fail and new block will be // allocated key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize) @@ -258,7 +259,7 @@ public class TestCloseContainerHandlingByClient { .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key); + waitForContainerClose(key); key.close(); // read the key from OM again and match the length.The length will still @@ -301,7 +302,7 @@ public class TestCloseContainerHandlingByClient { .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key); + waitForContainerClose(key); // write 3 more chunks worth of data. It will fail and new block will be // allocated. This write completes 4 blocks worth of data written to key data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen); @@ -330,8 +331,7 @@ public class TestCloseContainerHandlingByClient { Assert.assertEquals(4 * blockSize, length); } - private void waitForContainerClose(String keyName, - OzoneOutputStream outputStream) + private void waitForContainerClose(OzoneOutputStream outputStream) throws Exception { KeyOutputStream keyOutputStream = (KeyOutputStream) outputStream.getOutputStream(); @@ -375,7 +375,7 @@ public class TestCloseContainerHandlingByClient { .getPipeline(container.getPipelineID()); List<DatanodeDetails> datanodes = pipeline.getNodes(); Assert.assertEquals(1, datanodes.size()); - waitForContainerClose(keyName, key); + waitForContainerClose(key); dataString = ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize)); data = dataString.getBytes(UTF_8); @@ -421,7 +421,7 @@ public class TestCloseContainerHandlingByClient { .build(); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - waitForContainerClose(keyName, key); + waitForContainerClose(key); // Again Write the Data. This will throw an exception which will be handled // and new blocks will be allocated key.write(data); @@ -435,4 +435,42 @@ public class TestCloseContainerHandlingByClient { Assert.assertEquals(2 * data.length, keyInfo.getDataSize()); validateData(keyName, dataString.getBytes(UTF_8)); } + + @Test + public void testBlockWrites() throws Exception { + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, 2 * chunkSize) + .getBytes(UTF_8); + key.write(data1); + + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) + .build(); + + waitForContainerClose(key); + byte[] data2 = + ContainerTestHelper.getFixedLengthString(keyString, 3 * chunkSize) + .getBytes(UTF_8); + key.write(data2); + key.flush(); + key.close(); + // read the key from OM again and match the length.The length will still + // be the equal to the original data size. + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + Assert.assertEquals(5 * chunkSize, keyInfo.getDataSize()); + + // Written the same data twice + String dataString = new String(data1, UTF_8); + // Written the same data twice + String dataString2 = new String(data2, UTF_8); + dataString = dataString.concat(dataString2); + validateData(keyName, dataString.getBytes(UTF_8)); + } + } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index 78f7d29..41d87a2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -142,7 +142,7 @@ public class TestBlockDeletion { OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) .setBucketName(bucketName).setKeyName(keyName).setDataSize(0) - .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).build(); List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList = om.lookupKey(keyArgs).getKeyLocationVersions(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org