This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b077831a5b28cf5319f6d23fb02bf1203281189b Author: Benoit Tellier <[email protected]> AuthorDate: Wed Apr 12 15:23:45 2023 +0700 [PERF] ImapRequestFrameDecoder: move file creation out of the Netty event loop IMAP implementation buffers large mails into files. Priori to this change, the file creation was performed on the Netty event loop thread. This is problematic as file creation is blocking, it would result into the Netty event loop stalling, especially when the file system is overloaded. Also, in case of errors / connection closure during the buffering, we want to ensure that the intermediate temporary file is well disposed of. (not the case previously) --- .../imapserver/netty/ImapRequestFrameDecoder.java | 166 ++++++++++++++------- 1 file changed, 115 insertions(+), 51 deletions(-) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java index 02505969a2..4adab34f09 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java @@ -34,6 +34,8 @@ import java.util.Optional; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.imap.api.ImapMessage; @@ -44,6 +46,7 @@ import org.apache.james.imap.decode.ImapDecoder; import org.apache.james.imap.decode.ImapRequestLineReader; import org.apache.james.protocols.netty.LineHandlerAware; +import com.github.fge.lambdas.Throwing; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; @@ -53,6 +56,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.ByteToMessageDecoder; import reactor.core.Disposable; +import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; @@ -64,13 +68,9 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net @VisibleForTesting static final String NEEDED_DATA = "NEEDED_DATA"; private static final boolean RETRY = true; - private static final String STORED_DATA = "STORED_DATA"; - private static final String WRITTEN_DATA = "WRITTEN_DATA"; - private static final String OUTPUT_STREAM = "OUTPUT_STREAM"; private static final String SINK = "SINK"; private static final String SUBSCRIPTION = "SUBSCRIPTION"; - private final ImapDecoder decoder; private final int inMemorySizeLimit; private final int literalSizeLimit; @@ -91,6 +91,16 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net super.channelActive(ctx); } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Object subscription = ctx.channel().attr(FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY).get() + .get(SUBSCRIPTION); + if (subscription instanceof Disposable) { + ((Disposable) subscription).dispose(); + } + super.channelInactive(ctx); + } + @Override @SuppressWarnings("unchecked") protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { @@ -195,73 +205,127 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net } private void uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size, int readerIndex) throws IOException { - final File f; Sinks.Many<byte[]> sink; - OutputStream outputStream; // check if we have created a temporary file already or if // we need to create a new one - if (attachment.containsKey(STORED_DATA)) { + if (attachment.containsKey(SINK)) { sink = (Sinks.Many<byte[]>) attachment.get(SINK); } else { - f = Files.createTempFile("imap-literal", ".tmp").toFile(); - attachment.put(STORED_DATA, f); - final AtomicInteger written = new AtomicInteger(0); - attachment.put(WRITTEN_DATA, written); - outputStream = new FileOutputStream(f, true); - attachment.put(OUTPUT_STREAM, outputStream); sink = Sinks.many().unicast().onBackpressureBuffer(); attachment.put(SINK, sink); - Disposable subscribe = sink.asFlux() - .publishOn(Schedulers.boundedElastic()) - .doOnNext(next -> { - try { - int amount = Math.min(next.length, size - written.get()); - outputStream.write(next, 0, amount); - written.addAndGet(amount); - } catch (Exception e) { - try { - outputStream.close(); - } catch (IOException ignored) { - //ignore exception during close - } - throw new RuntimeException(e); - } + FileChunkConsumer fileChunkConsumer = new FileChunkConsumer(size, + (file, written) -> { + ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), file, RETRY); - // Check if all needed data was streamed to the file. - if (written.get() == size) { - try { - outputStream.close(); - } catch (IOException ignored) { - //ignore exception during close - } - - ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, RETRY); - - try { - parseImapMessage(ctx, null, attachment, Pair.of(reader, written.get()), readerIndex) - .ifPresent(ctx::fireChannelRead); - } catch (DecodingException e) { - ctx.fireExceptionCaught(e); - } + try { + parseImapMessage(ctx, null, attachment, Pair.of(reader, size), readerIndex) + .ifPresent(ctx::fireChannelRead); + } catch (DecodingException e) { + ctx.fireExceptionCaught(e); } - }) - .subscribe(o -> { - + }); + Disposable subscribe = sink.asFlux() + .publishOn(Schedulers.boundedElastic()) + .subscribe(fileChunkConsumer, + e -> { + fileChunkConsumer.discard(); + ctx.fireExceptionCaught(e); }, - ctx::fireExceptionCaught, () -> { }); - attachment.put(SUBSCRIPTION, subscribe); + attachment.put(SUBSCRIPTION, (Disposable) () -> { + subscribe.dispose(); + fileChunkConsumer.discard(); + }); } - final int readableBytes = in.readableBytes(); - final byte[] bytes = new byte[readableBytes]; + int readableBytes = in.readableBytes(); + byte[] bytes = new byte[readableBytes]; in.readBytes(bytes); sink.emitNext(bytes, FAIL_FAST); } + static class FileChunkConsumer implements Consumer<byte[]> { + private final int size; + private final AtomicInteger written = new AtomicInteger(0); + private final BiConsumer<File, Integer> callback; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private OutputStream outputStream; + private File f; + + FileChunkConsumer(int size, BiConsumer<File, Integer> callback) { + this.size = size; + this.callback = callback; + } + + @Override + public void accept(byte[] next) { + if (!initialized.get()) { + initialize(); + } + + writeChunk(next); + + // Check if all needed data was streamed to the file. + if (isComplete()) { + finalizeDataTransfer(); + } + } + + private void initialize() { + try { + f = Files.createTempFile("imap-literal", ".tmp").toFile(); + outputStream = new FileOutputStream(f, true); + initialized.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void writeChunk(byte[] next) { + try { + int amount = Math.min(next.length, size - written.get()); + outputStream.write(next, 0, amount); + written.addAndGet(amount); + } catch (Exception e) { + try { + outputStream.close(); + } catch (IOException ignored) { + //ignore exception during close + } + throw new RuntimeException(e); + } + } + + private boolean isComplete() { + return written.get() == size; + } + + private void finalizeDataTransfer() { + try { + outputStream.close(); + } catch (IOException ignored) { + //ignore exception during close + } + + callback.accept(f, written.get()); + } + + void discard() { + Mono.fromRunnable(Throwing.runnable(() -> { + if (outputStream != null) { + outputStream.close(); + } + if (f != null) { + Files.delete(f.toPath()); + } + })).subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + } + } + public void disableFraming(ChannelHandlerContext ctx) { if (framingEnabled.getAndSet(false)) { ctx.channel().pipeline().remove(FRAMER); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
