Hi,
I think the quoted implementation of migrateSegments is overly complex.
Since we apparently need / want to keep the order of the segments in the
archive across migration there is no way to parallelize writing the
segments. However, this took me a while to figure out looking at the
current implementation.
I would suggest to implement this by reading in parallel and putting
respective futures into a list. Writing is then done by subsequently
waiting on those futures for completion:
private void migrateSegments(
SegmentArchiveReader reader,
SegmentArchiveWriter writer)
throws ExecutionException, InterruptedException, IOException {
List<Future<Segment>> futures = new ArrayList<>();
for (SegmentArchiveEntry entry : reader.listSegments()) {
futures.add(executor.submit(() -> {
Segment segment = new Segment(entry);
segment.read(reader);
return segment;
}));
}
for (Future<Segment> future : futures) {
Segment segment = future.get();
segment.write(writer);
}
}
Michael
On 22.01.19 10:25, [email protected] wrote:
+ private void migrateSegments(SegmentArchiveReader reader,
SegmentArchiveWriter writer)
+ throws InterruptedException, ExecutionException {
+ BlockingDeque<Segment> readDeque = new
LinkedBlockingDeque<>(READ_THREADS);
+ BlockingDeque<Segment> writeDeque = new
LinkedBlockingDeque<>(READ_THREADS);
+ AtomicBoolean processingFinished = new AtomicBoolean(false);
+ AtomicBoolean exception = new AtomicBoolean(false);
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < READ_THREADS; i++) {
+ futures.add(executor.submit(() -> {
+ try {
+ while (!exception.get() && !(readDeque.isEmpty() &&
processingFinished.get())) {
+ Segment segment = readDeque.poll(100,
TimeUnit.MILLISECONDS);
+ if (segment != null) {
+ segment.read(reader);
+ }
+ }
+ return null;
+ } catch (Exception e) {
+ exception.set(true);
+ throw e;
+ }
+ }));
+ }
+ futures.add(executor.submit(() -> {
+ try {
+ while (!exception.get() && !(writeDeque.isEmpty() &&
processingFinished.get())) {
+ Segment segment = writeDeque.poll(100,
TimeUnit.MILLISECONDS);
+ if (segment != null) {
+ while (segment.data == null && !exception.get()) {
+ Thread.sleep(10);
+ }
+ segment.write(writer);
+ }
+ }
+ return null;
+ } catch (Exception e) {
+ exception.set(true);
+ throw e;
+ }
+ }));
+ for (SegmentArchiveEntry entry : reader.listSegments()) {
+ Segment segment = new Segment(entry);
+ readDeque.putLast(segment);
+ writeDeque.putLast(segment);
+ }
+ processingFinished.set(true);
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ }