adoroszlai commented on a change in pull request #438: HDDS-2878. Refactor
MiniOzoneLoadGenerator to add more load generators to chaos testing.
URL: https://github.com/apache/hadoop-ozone/pull/438#discussion_r394929910
##########
File path:
hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
##########
@@ -47,212 +42,61 @@
LoggerFactory.getLogger(MiniOzoneLoadGenerator.class);
private static String keyNameDelimiter = "_";
-
- private ThreadPoolExecutor writeExecutor;
- private int numThreads;
- // number of buffer to be allocated, each is allocated with length which
- // is multiple of 2, each buffer is populated with random data.
- private int numBuffers;
- private List<ByteBuffer> buffers;
-
- private AtomicBoolean isIOThreadRunning;
-
- private final List<LoadBucket> ozoneBuckets;
-
- private final AtomicInteger agedFileWrittenIndex;
- private final ExecutorService agedFileExecutor;
- private final LoadBucket agedLoadBucket;
- private final TestProbability agedWriteProbability;
-
- private final ThreadPoolExecutor fsExecutor;
- private final LoadBucket fsBucket;
-
- MiniOzoneLoadGenerator(List<LoadBucket> bucket,
- LoadBucket agedLoadBucket, LoadBucket fsBucket,
- int numThreads, int numBuffers) {
- this.ozoneBuckets = bucket;
- this.numThreads = numThreads;
- this.numBuffers = numBuffers;
- this.writeExecutor = createExecutor();
-
- this.agedFileWrittenIndex = new AtomicInteger(0);
- this.agedFileExecutor = Executors.newSingleThreadExecutor();
- this.agedLoadBucket = agedLoadBucket;
- this.agedWriteProbability = TestProbability.valueOf(10);
-
- this.fsExecutor = createExecutor();
- this.fsBucket = fsBucket;
-
- this.isIOThreadRunning = new AtomicBoolean(false);
-
- // allocate buffers and populate random data.
- buffers = new ArrayList<>();
- for (int i = 0; i < numBuffers; i++) {
- int size = (int) StorageUnit.KB.toBytes(1 << i);
- ByteBuffer buffer = ByteBuffer.allocate(size);
- buffer.put(RandomUtils.nextBytes(size));
- buffers.add(buffer);
- }
- }
-
- private ThreadPoolExecutor createExecutor() {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
numThreads,
- 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
- new ThreadPoolExecutor.CallerRunsPolicy());
- executor.prestartAllCoreThreads();
- return executor;
-
- }
-
- // Start IO load on an Ozone bucket.
- private void load(long runTimeMillis) {
- long threadID = Thread.currentThread().getId();
- LOG.info("Started Mixed IO Thread:{}.", threadID);
- String threadName = Thread.currentThread().getName();
- long startTime = Time.monotonicNow();
-
- while (isIOThreadRunning.get() &&
- (Time.monotonicNow() < startTime + runTimeMillis)) {
- LoadBucket bucket =
- ozoneBuckets.get((int) (Math.random() * ozoneBuckets.size()));
- try {
- int index = RandomUtils.nextInt();
- ByteBuffer buffer = getBuffer(index);
- String keyName = getKeyName(index, threadName);
- bucket.writeKey(buffer, keyName);
-
- bucket.readKey(buffer, keyName);
-
- bucket.deleteKey(keyName);
- } catch (Exception e) {
- LOG.error("LOADGEN: Exiting due to exception", e);
- break;
- }
- }
- // This will terminate other threads too.
- isIOThreadRunning.set(false);
- LOG.info("Terminating IO thread:{}.", threadID);
- }
-
- private Optional<Integer> randomKeyToRead() {
- int currentIndex = agedFileWrittenIndex.get();
- return currentIndex != 0
- ? Optional.of(RandomUtils.nextInt(0, currentIndex))
- : Optional.empty();
- }
-
- private void startAgedLoad(long runTimeMillis) {
- long threadID = Thread.currentThread().getId();
- LOG.info("AGED LOADGEN: Started Aged IO Thread:{}.", threadID);
- String threadName = Thread.currentThread().getName();
- long startTime = Time.monotonicNow();
-
- while (isIOThreadRunning.get() &&
- (Time.monotonicNow() < startTime + runTimeMillis)) {
-
- String keyName = null;
- try {
- if (agedWriteProbability.isTrue()) {
- int index = agedFileWrittenIndex.getAndIncrement();
- ByteBuffer buffer = getBuffer(index);
- keyName = getKeyName(index, threadName);
-
- agedLoadBucket.writeKey(buffer, keyName);
- } else {
- Optional<Integer> index = randomKeyToRead();
- if (index.isPresent()) {
- ByteBuffer buffer = getBuffer(index.get());
- keyName = getKeyName(index.get(), threadName);
- agedLoadBucket.readKey(buffer, keyName);
- }
- }
- } catch (Throwable t) {
- LOG.error("AGED LOADGEN: {} Exiting due to exception", keyName, t);
- break;
- }
- }
- // This will terminate other threads too.
- isIOThreadRunning.set(false);
- LOG.info("Terminating IO thread:{}.", threadID);
- }
-
- // Start IO load on an Ozone bucket.
- private void startFsLoad(long runTimeMillis) {
- long threadID = Thread.currentThread().getId();
- LOG.info("Started Filesystem IO Thread:{}.", threadID);
- String threadName = Thread.currentThread().getName();
- long startTime = Time.monotonicNow();
-
- while (isIOThreadRunning.get() &&
- (Time.monotonicNow() < startTime + runTimeMillis)) {
- try {
- int index = RandomUtils.nextInt();
- ByteBuffer buffer = getBuffer(index);
- String keyName = getKeyName(index, threadName);
- fsBucket.writeKey(true, buffer, keyName);
-
- fsBucket.readKey(true, buffer, keyName);
-
- fsBucket.deleteKey(true, keyName);
- } catch (Exception e) {
- LOG.error("LOADGEN: Exiting due to exception", e);
- break;
- }
+ private final List<LoadExecutors> loadExecutors;
+
+ MiniOzoneLoadGenerator(OzoneVolume volume, int numClients, int numThreads,
+ int numBuffers, OzoneConfiguration conf)
+ throws Exception {
+ DataBuffer buffer = new DataBuffer(numBuffers);
+ loadExecutors = new ArrayList<>();
+
+ // Random Load
+ String mixBucketName =
RandomStringUtils.randomAlphabetic(10).toLowerCase();
+ volume.createBucket(mixBucketName);
+ List<LoadBucket> ozoneBuckets = new ArrayList<>(numClients);
+ for (int i = 0; i < numClients; i++) {
+ ozoneBuckets.add(new LoadBucket(volume.getBucket(mixBucketName),
+ conf));
}
- // This will terminate other threads too.
- isIOThreadRunning.set(false);
- LOG.info("Terminating IO thread:{}.", threadID);
+ RandomLoadGenerator loadGenerator =
+ new RandomLoadGenerator(buffer, ozoneBuckets);
+ loadExecutors.add(new LoadExecutors(numThreads, loadGenerator));
+
+ // Aged Load
+ String agedBucketName =
+ RandomStringUtils.randomAlphabetic(10).toLowerCase();
+ volume.createBucket(agedBucketName);
+ LoadBucket agedLoadBucket =
+ new LoadBucket(volume.getBucket(agedBucketName), conf);
+ AgedLoadGenerator agedLoadGenerator =
+ new AgedLoadGenerator(buffer, agedLoadBucket);
+ loadExecutors.add(new LoadExecutors(numThreads, agedLoadGenerator));
+
+ //Filesystem Load
+ String fsBucketName =
+ RandomStringUtils.randomAlphabetic(10).toLowerCase();
+
+ volume.createBucket(fsBucketName);
+ LoadBucket fsBucket =
+ new LoadBucket(volume.getBucket(fsBucketName), conf);
+ FilesystemLoadGenerator filesystemLoadGenerator =
+ new FilesystemLoadGenerator(buffer, fsBucket);
+ loadExecutors.add(new LoadExecutors(numThreads, filesystemLoadGenerator));
}
void startIO(long time, TimeUnit timeUnit) {
- List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
- LOG.info("Starting MiniOzoneLoadGenerator for time {}:{} with {} buffers "
+
- "and {} threads", time, timeUnit, numBuffers, numThreads);
- if (isIOThreadRunning.compareAndSet(false, true)) {
- // Start the IO thread
- for (int i = 0; i < numThreads; i++) {
- writeFutures.add(
- CompletableFuture.runAsync(() -> load(timeUnit.toMillis(time)),
- writeExecutor));
- }
-
- for (int i = 0; i < numThreads; i++) {
- writeFutures.add(
- CompletableFuture.runAsync(() -> startAgedLoad(
- timeUnit.toMillis(time)), agedFileExecutor));
- }
-
- for (int i = 0; i < numThreads; i++) {
- writeFutures.add(
- CompletableFuture.runAsync(() -> startFsLoad(
- timeUnit.toMillis(time)), fsExecutor));
- }
-
- // Wait for IO to complete
- for (CompletableFuture<Void> f : writeFutures) {
- try {
- f.get();
- } catch (Throwable t) {
- LOG.error("startIO failed with exception", t);
- }
- }
- }
- }
-
- public void shutdownLoadGenerator() {
- try {
- writeExecutor.shutdown();
- writeExecutor.awaitTermination(1, TimeUnit.DAYS);
- } catch (Exception e) {
- LOG.error("error while closing ", e);
- }
+ LOG.info("Starting MiniOzoneLoadGenerator for time {}:{}", time, timeUnit);
+ long runTime = timeUnit.toMillis(time);
+ // start and wait for executors to finish
+ loadExecutors.forEach(le -> le.startLoad(runTime));
+ loadExecutors.forEach(LoadExecutors::waitForCompletion);
}
- private ByteBuffer getBuffer(int keyIndex) {
- return buffers.get(keyIndex % numBuffers);
+ void shutdownLoadGenerator() {
+ loadExecutors.forEach(LoadExecutors::shutdown);
}
- private String getKeyName(int keyIndex, String threadName) {
- return threadName + keyNameDelimiter + keyIndex;
+ public static String getKeyName(int keyIndex, String prefix) {
+ return prefix + keyNameDelimiter + keyIndex;
}
Review comment:
Nit: can be moved to `LoadGenerator`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]