Hi,

I think the quoted implementation of migrateSegments is overly complex. Since we apparently need / want to keep the order of the segments in the archive across migration there is no way to parallelize writing the segments. However, this took me a while to figure out looking at the current implementation.

I would suggest to implement this by reading in parallel and putting respective futures into a list. Writing is then done by subsequently waiting on those futures for completion:

private void migrateSegments(
    SegmentArchiveReader reader,
    SegmentArchiveWriter writer)
throws ExecutionException, InterruptedException, IOException {

    List<Future<Segment>> futures = new ArrayList<>();
    for (SegmentArchiveEntry entry : reader.listSegments()) {
        futures.add(executor.submit(() -> {
            Segment segment = new Segment(entry);
            segment.read(reader);
            return segment;
        }));
    }

    for (Future<Segment> future : futures) {
        Segment segment = future.get();
        segment.write(writer);
    }
}


Michael


On 22.01.19 10:25, [email protected] wrote:
+    private void migrateSegments(SegmentArchiveReader reader, 
SegmentArchiveWriter writer)
+            throws InterruptedException, ExecutionException {
+        BlockingDeque<Segment> readDeque = new 
LinkedBlockingDeque<>(READ_THREADS);
+        BlockingDeque<Segment> writeDeque = new 
LinkedBlockingDeque<>(READ_THREADS);
+        AtomicBoolean processingFinished = new AtomicBoolean(false);
+        AtomicBoolean exception = new AtomicBoolean(false);
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < READ_THREADS; i++) {
+            futures.add(executor.submit(() -> {
+                try {
+                    while (!exception.get() && !(readDeque.isEmpty() && 
processingFinished.get())) {
+                        Segment segment = readDeque.poll(100, 
TimeUnit.MILLISECONDS);
+                        if (segment != null) {
+                            segment.read(reader);
+                        }
+                    }
+                    return null;
+                } catch (Exception e) {
+                    exception.set(true);
+                    throw e;
+                }
+            }));
+        }
+        futures.add(executor.submit(() -> {
+            try {
+                while (!exception.get() && !(writeDeque.isEmpty() && 
processingFinished.get())) {
+                    Segment segment = writeDeque.poll(100, 
TimeUnit.MILLISECONDS);
+                    if (segment != null) {
+                        while (segment.data == null && !exception.get()) {
+                            Thread.sleep(10);
+                        }
+                        segment.write(writer);
+                    }
+                }
+                return null;
+            } catch (Exception e) {
+                exception.set(true);
+                throw e;
+            }
+        }));
+        for (SegmentArchiveEntry entry : reader.listSegments()) {
+            Segment segment = new Segment(entry);
+            readDeque.putLast(segment);
+            writeDeque.putLast(segment);
+        }
+        processingFinished.set(true);
+        for (Future<?> future : futures) {
+            future.get();
+        }
+    }

Reply via email to