Hi Michael,

Thanks for suggesting the improvement. I'll re-open the issue and fix the
code accordingly.

Regards,
Andrei

On Fri, Jan 25, 2019 at 12:13 PM Michael Dürig <[email protected]> wrote:

>
> Hi,
>
> I think the quoted implementation of migrateSegments is overly complex.
> Since we apparently need / want to keep the order of the segments in the
> archive across migration there is no way to parallelize writing the
> segments. However, this took me a while to figure out looking at the
> current implementation.
>
> I would suggest to implement this by reading in parallel and putting
> respective futures into a list. Writing is then done by subsequently
> waiting on those futures for completion:
>
> private void migrateSegments(
>      SegmentArchiveReader reader,
>      SegmentArchiveWriter writer)
> throws ExecutionException, InterruptedException, IOException {
>
>      List<Future<Segment>> futures = new ArrayList<>();
>      for (SegmentArchiveEntry entry : reader.listSegments()) {
>          futures.add(executor.submit(() -> {
>              Segment segment = new Segment(entry);
>              segment.read(reader);
>              return segment;
>          }));
>      }
>
>      for (Future<Segment> future : futures) {
>          Segment segment = future.get();
>          segment.write(writer);
>      }
> }
>
>
> Michael
>
>
> On 22.01.19 10:25, [email protected] wrote:
> > +    private void migrateSegments(SegmentArchiveReader reader,
> SegmentArchiveWriter writer)
> > +            throws InterruptedException, ExecutionException {
> > +        BlockingDeque<Segment> readDeque = new
> LinkedBlockingDeque<>(READ_THREADS);
> > +        BlockingDeque<Segment> writeDeque = new
> LinkedBlockingDeque<>(READ_THREADS);
> > +        AtomicBoolean processingFinished = new AtomicBoolean(false);
> > +        AtomicBoolean exception = new AtomicBoolean(false);
> > +        List<Future<?>> futures = new ArrayList<>();
> > +        for (int i = 0; i < READ_THREADS; i++) {
> > +            futures.add(executor.submit(() -> {
> > +                try {
> > +                    while (!exception.get() && !(readDeque.isEmpty() &&
> processingFinished.get())) {
> > +                        Segment segment = readDeque.poll(100,
> TimeUnit.MILLISECONDS);
> > +                        if (segment != null) {
> > +                            segment.read(reader);
> > +                        }
> > +                    }
> > +                    return null;
> > +                } catch (Exception e) {
> > +                    exception.set(true);
> > +                    throw e;
> > +                }
> > +            }));
> > +        }
> > +        futures.add(executor.submit(() -> {
> > +            try {
> > +                while (!exception.get() && !(writeDeque.isEmpty() &&
> processingFinished.get())) {
> > +                    Segment segment = writeDeque.poll(100,
> TimeUnit.MILLISECONDS);
> > +                    if (segment != null) {
> > +                        while (segment.data == null &&
> !exception.get()) {
> > +                            Thread.sleep(10);
> > +                        }
> > +                        segment.write(writer);
> > +                    }
> > +                }
> > +                return null;
> > +            } catch (Exception e) {
> > +                exception.set(true);
> > +                throw e;
> > +            }
> > +        }));
> > +        for (SegmentArchiveEntry entry : reader.listSegments()) {
> > +            Segment segment = new Segment(entry);
> > +            readDeque.putLast(segment);
> > +            writeDeque.putLast(segment);
> > +        }
> > +        processingFinished.set(true);
> > +        for (Future<?> future : futures) {
> > +            future.get();
> > +        }
> > +    }
>

Reply via email to