This is an automated email from the ASF dual-hosted git repository. aengineer pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 0883ce1 HDDS-2026. Overlapping chunk region cannot be read concurrently 0883ce1 is described below commit 0883ce102113cdc9527ab8aa548895a8418cb6bb Author: Doroszlai, Attila <adorosz...@apache.org> AuthorDate: Mon Aug 26 12:59:47 2019 +0200 HDDS-2026. Overlapping chunk region cannot be read concurrently Signed-off-by: Anu Engineer <aengin...@apache.org> --- .../container/keyvalue/helpers/ChunkUtils.java | 188 +++++++++++---------- .../container/keyvalue/helpers/TestChunkUtils.java | 164 ++++++++++++++++++ 2 files changed, 267 insertions(+), 85 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index 2993bbb..a043cdc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; @@ -36,17 +37,20 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl; import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats; import org.apache.hadoop.util.Time; +import org.apache.ratis.util.function.CheckedSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; +import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.security.NoSuchAlgorithmException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*; @@ -56,6 +60,8 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res */ public final class ChunkUtils { + private static final Set<Path> LOCKS = ConcurrentHashMap.newKeySet(); + /** Never constructed. **/ private ChunkUtils() { @@ -67,9 +73,8 @@ public final class ChunkUtils { * @param chunkFile - File to write data to. * @param chunkInfo - Data stream to write. * @param data - The data buffer. - * @param volumeIOStats + * @param volumeIOStats statistics collector * @param sync whether to do fsync or not - * @throws StorageContainerException */ public static void writeData(File chunkFile, ChunkInfo chunkInfo, ByteBuffer data, VolumeIOStats volumeIOStats, boolean sync) @@ -85,58 +90,43 @@ public final class ChunkUtils { throw new StorageContainerException(err, INVALID_WRITE_SIZE); } - FileChannel file = null; - FileLock lock = null; + Path path = chunkFile.toPath(); + long startTime = Time.monotonicNow(); + processFileExclusively(path, () -> { + FileChannel file = null; + try { + // skip SYNC and DSYNC to reduce contention on file.lock + file = FileChannel.open(path, + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.SPARSE); + + int size; + try (FileLock ignored = file.lock()) { + size = file.write(data, chunkInfo.getOffset()); + } - try { - long writeTimeStart = Time.monotonicNow(); - - // skip SYNC and DSYNC to reduce contention on file.lock - file = FileChannel.open(chunkFile.toPath(), - StandardOpenOption.CREATE, - StandardOpenOption.WRITE, - StandardOpenOption.SPARSE); - - lock = file.lock(); - int size = file.write(data, chunkInfo.getOffset()); - // Increment volumeIO stats here. - volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart); - volumeIOStats.incWriteOpCount(); - volumeIOStats.incWriteBytes(size); - if (size != bufferSize) { - log.error("Invalid write size found. Size:{} Expected: {} ", size, - bufferSize); - throw new StorageContainerException("Invalid write size found. " + - "Size: " + size + " Expected: " + bufferSize, INVALID_WRITE_SIZE); + // Increment volumeIO stats here. + volumeIOStats.incWriteTime(Time.monotonicNow() - startTime); + volumeIOStats.incWriteOpCount(); + volumeIOStats.incWriteBytes(size); + if (size != bufferSize) { + log.error("Invalid write size found. Size:{} Expected: {} ", size, + bufferSize); + throw new StorageContainerException("Invalid write size found. " + + "Size: " + size + " Expected: " + bufferSize, INVALID_WRITE_SIZE); + } + } catch (StorageContainerException ex) { + throw ex; + } catch (IOException e) { + throw new StorageContainerException(e, IO_EXCEPTION); + } finally { + closeFile(file, sync); } - } catch (StorageContainerException ex) { - throw ex; - } catch(IOException e) { - throw new StorageContainerException(e, IO_EXCEPTION); - } finally { - if (lock != null) { - try { - lock.release(); - } catch (IOException e) { - log.error("Unable to release lock ??, Fatal Error."); - throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR); + return null; + }); - } - } - if (file != null) { - try { - if (sync) { - // ensure data and metadata is persisted. Outside the lock - file.force(true); - } - file.close(); - } catch (IOException e) { - throw new StorageContainerException("Error closing chunk file", - e, CONTAINER_INTERNAL_ERROR); - } - } - } log.debug("Write Chunk completed for chunkFile: {}, size {}", chunkFile, bufferSize); } @@ -146,11 +136,8 @@ public final class ChunkUtils { * * @param chunkFile - file where data lives. * @param data - chunk definition. - * @param volumeIOStats + * @param volumeIOStats statistics collector * @return ByteBuffer - * @throws StorageContainerException - * @throws ExecutionException - * @throws InterruptedException */ public static ByteBuffer readData(File chunkFile, ChunkInfo data, VolumeIOStats volumeIOStats) throws StorageContainerException, @@ -165,38 +152,36 @@ public final class ChunkUtils { data.toString(), UNABLE_TO_FIND_CHUNK); } - AsynchronousFileChannel file = null; - FileLock lock = null; - try { - long readStartTime = Time.monotonicNow(); - file = - AsynchronousFileChannel.open(chunkFile.toPath(), - StandardOpenOption.READ); - lock = file.lock(data.getOffset(), data.getLen(), true).get(); - - ByteBuffer buf = ByteBuffer.allocate((int) data.getLen()); - file.read(buf, data.getOffset()).get(); - - // Increment volumeIO stats here. - volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime); - volumeIOStats.incReadOpCount(); - volumeIOStats.incReadBytes(data.getLen()); - - return buf; - } catch (IOException e) { - throw new StorageContainerException(e, IO_EXCEPTION); - } finally { - if (lock != null) { - try { - lock.release(); - } catch (IOException e) { - log.error("I/O error is lock release."); + long offset = data.getOffset(); + long len = data.getLen(); + ByteBuffer buf = ByteBuffer.allocate((int) len); + + Path path = chunkFile.toPath(); + long startTime = Time.monotonicNow(); + return processFileExclusively(path, () -> { + FileChannel file = null; + + try { + file = FileChannel.open(path, StandardOpenOption.READ); + + try (FileLock ignored = file.lock(offset, len, true)) { + file.read(buf, offset); + } + + // Increment volumeIO stats here. + volumeIOStats.incReadTime(Time.monotonicNow() - startTime); + volumeIOStats.incReadOpCount(); + volumeIOStats.incReadBytes(len); + + return buf; + } catch (IOException e) { + throw new StorageContainerException(e, IO_EXCEPTION); + } finally { + if (file != null) { + IOUtils.closeStream(file); } } - if (file != null) { - IOUtils.closeStream(file); - } - } + }); } /** @@ -326,4 +311,37 @@ public final class ChunkUtils { builder.setReadChunk(response); return builder.build(); } + + @VisibleForTesting + static <T, E extends Exception> T processFileExclusively( + Path path, CheckedSupplier<T, E> op + ) throws E { + for (;;) { + if (LOCKS.add(path)) { + break; + } + } + + try { + return op.get(); + } finally { + LOCKS.remove(path); + } + } + + private static void closeFile(FileChannel file, boolean sync) + throws StorageContainerException { + if (file != null) { + try { + if (sync) { + // ensure data and metadata is persisted + file.force(true); + } + file.close(); + } catch (IOException e) { + throw new StorageContainerException("Error closing chunk file", + e, CONTAINER_INTERNAL_ERROR); + } + } + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java new file mode 100644 index 0000000..4a1637c --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.keyvalue.helpers; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Tests for {@link ChunkUtils}. + */ +public class TestChunkUtils { + + private static final Logger LOG = + LoggerFactory.getLogger(TestChunkUtils.class); + + private static final String PREFIX = TestChunkUtils.class.getSimpleName(); + + @Test + public void concurrentReadOfSameFile() throws Exception { + String s = "Hello World"; + byte[] array = s.getBytes(); + ByteBuffer data = ByteBuffer.wrap(array); + Path tempFile = Files.createTempFile(PREFIX, "concurrent"); + try { + ChunkInfo chunkInfo = new ChunkInfo(tempFile.toString(), + 0, data.capacity()); + File file = tempFile.toFile(); + VolumeIOStats stats = new VolumeIOStats(); + ChunkUtils.writeData(file, chunkInfo, data, stats, true); + int threads = 10; + ExecutorService executor = new ThreadPoolExecutor(threads, threads, + 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + AtomicInteger processed = new AtomicInteger(); + AtomicBoolean failed = new AtomicBoolean(); + for (int i = 0; i < threads; i++) { + final int threadNumber = i; + executor.submit(() -> { + try { + ByteBuffer readBuffer = ChunkUtils.readData(file, chunkInfo, stats); + LOG.info("Read data ({}): {}", threadNumber, + new String(readBuffer.array())); + if (!Arrays.equals(array, readBuffer.array())) { + failed.set(true); + } + } catch (Exception e) { + LOG.error("Failed to read data ({})", threadNumber, e); + failed.set(true); + } + processed.incrementAndGet(); + }); + } + try { + GenericTestUtils.waitFor(() -> processed.get() == threads, + 100, (int) TimeUnit.SECONDS.toMillis(5)); + } finally { + executor.shutdownNow(); + } + assertEquals(threads * stats.getWriteBytes(), stats.getReadBytes()); + assertFalse(failed.get()); + } finally { + Files.deleteIfExists(tempFile); + } + } + + @Test + public void concurrentProcessing() throws Exception { + final int perThreadWait = 1000; + final int maxTotalWait = 5000; + int threads = 20; + List<Path> paths = new LinkedList<>(); + + try { + ExecutorService executor = new ThreadPoolExecutor(threads, threads, + 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + AtomicInteger processed = new AtomicInteger(); + for (int i = 0; i < threads; i++) { + Path path = Files.createTempFile(PREFIX, String.valueOf(i)); + paths.add(path); + executor.submit(() -> { + ChunkUtils.processFileExclusively(path, () -> { + try { + Thread.sleep(perThreadWait); + } catch (InterruptedException e) { + e.printStackTrace(); + } + processed.incrementAndGet(); + return null; + }); + }); + } + try { + GenericTestUtils.waitFor(() -> processed.get() == threads, + 100, maxTotalWait); + } finally { + executor.shutdownNow(); + } + } finally { + for (Path path : paths) { + FileUtils.deleteQuietly(path.toFile()); + } + } + } + + @Test + public void serialRead() throws Exception { + String s = "Hello World"; + byte[] array = s.getBytes(); + ByteBuffer data = ByteBuffer.wrap(array); + Path tempFile = Files.createTempFile(PREFIX, "serial"); + try { + ChunkInfo chunkInfo = new ChunkInfo(tempFile.toString(), + 0, data.capacity()); + File file = tempFile.toFile(); + VolumeIOStats stats = new VolumeIOStats(); + ChunkUtils.writeData(file, chunkInfo, data, stats, true); + ByteBuffer readBuffer = ChunkUtils.readData(file, chunkInfo, stats); + assertArrayEquals(array, readBuffer.array()); + assertEquals(stats.getWriteBytes(), stats.getReadBytes()); + } catch (Exception e) { + LOG.error("Failed to read data", e); + } finally { + Files.deleteIfExists(tempFile); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org