Hi Michael, Thanks for suggesting the improvement. I'll re-open the issue and fix the code accordingly.
Regards, Andrei On Fri, Jan 25, 2019 at 12:13 PM Michael Dürig <[email protected]> wrote: > > Hi, > > I think the quoted implementation of migrateSegments is overly complex. > Since we apparently need / want to keep the order of the segments in the > archive across migration there is no way to parallelize writing the > segments. However, this took me a while to figure out looking at the > current implementation. > > I would suggest to implement this by reading in parallel and putting > respective futures into a list. Writing is then done by subsequently > waiting on those futures for completion: > > private void migrateSegments( > SegmentArchiveReader reader, > SegmentArchiveWriter writer) > throws ExecutionException, InterruptedException, IOException { > > List<Future<Segment>> futures = new ArrayList<>(); > for (SegmentArchiveEntry entry : reader.listSegments()) { > futures.add(executor.submit(() -> { > Segment segment = new Segment(entry); > segment.read(reader); > return segment; > })); > } > > for (Future<Segment> future : futures) { > Segment segment = future.get(); > segment.write(writer); > } > } > > > Michael > > > On 22.01.19 10:25, [email protected] wrote: > > + private void migrateSegments(SegmentArchiveReader reader, > SegmentArchiveWriter writer) > > + throws InterruptedException, ExecutionException { > > + BlockingDeque<Segment> readDeque = new > LinkedBlockingDeque<>(READ_THREADS); > > + BlockingDeque<Segment> writeDeque = new > LinkedBlockingDeque<>(READ_THREADS); > > + AtomicBoolean processingFinished = new AtomicBoolean(false); > > + AtomicBoolean exception = new AtomicBoolean(false); > > + List<Future<?>> futures = new ArrayList<>(); > > + for (int i = 0; i < READ_THREADS; i++) { > > + futures.add(executor.submit(() -> { > > + try { > > + while (!exception.get() && !(readDeque.isEmpty() && > processingFinished.get())) { > > + Segment segment = readDeque.poll(100, > TimeUnit.MILLISECONDS); > > + if (segment != null) { > > + segment.read(reader); > > + } > > + } > > + return null; > > + } catch (Exception e) { > > + exception.set(true); > > + throw e; > > + } > > + })); > > + } > > + futures.add(executor.submit(() -> { > > + try { > > + while (!exception.get() && !(writeDeque.isEmpty() && > processingFinished.get())) { > > + Segment segment = writeDeque.poll(100, > TimeUnit.MILLISECONDS); > > + if (segment != null) { > > + while (segment.data == null && > !exception.get()) { > > + Thread.sleep(10); > > + } > > + segment.write(writer); > > + } > > + } > > + return null; > > + } catch (Exception e) { > > + exception.set(true); > > + throw e; > > + } > > + })); > > + for (SegmentArchiveEntry entry : reader.listSegments()) { > > + Segment segment = new Segment(entry); > > + readDeque.putLast(segment); > > + writeDeque.putLast(segment); > > + } > > + processingFinished.set(true); > > + for (Future<?> future : futures) { > > + future.get(); > > + } > > + } >
