This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit daf3d4bba15ecde6f42e235236204e98523a545e Author: Tung Van TRAN <[email protected]> AuthorDate: Wed Mar 9 17:04:00 2022 +0700 JAMES-3724 - Implement LeakAware --- .../core/BufferedDeferredFileOutputStream.java | 9 +- .../server/core/MimeMessageInputStreamSource.java | 128 +++++++------ .../james/server/core/MimeMessageWrapper.java | 2 +- .../core/MimeMessageInputStreamSourceTest.java | 6 +- .../james/server/core/MimeMessageWrapperTest.java | 6 +- server/container/lifecycle-api/pom.xml | 5 + .../org/apache/james/lifecycle/api/Disposable.java | 177 ++++++++++++++++++ .../apache/james/lifecycle/api/LeakAwareTest.java | 200 +++++++++++++++++++++ .../james/smtpserver/JamesDataCmdHandler.java | 2 +- 9 files changed, 470 insertions(+), 65 deletions(-) diff --git a/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java b/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java index 761f9b1..b5e3f1e 100644 --- a/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java +++ b/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java @@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.commons.io.output.DeferredFileOutputStream; import org.apache.commons.io.output.ThresholdingOutputStream; +import org.apache.james.lifecycle.api.Disposable; /** * An almost copy of {@link DeferredFileOutputStream} with buffered file stream. @@ -46,7 +47,7 @@ import org.apache.commons.io.output.ThresholdingOutputStream; * * @link https://issues.apache.org/jira/browse/JAMES-2343 */ -public class BufferedDeferredFileOutputStream extends ThresholdingOutputStream { +public class BufferedDeferredFileOutputStream extends ThresholdingOutputStream implements Disposable { /** * The output stream to which data will be written prior to the theshold @@ -261,4 +262,10 @@ public class BufferedDeferredFileOutputStream extends ThresholdingOutputStream { } } } + + @Override + public void dispose() { + // Fasten GC of the big byte array + memoryOutputStream = null; + } } \ No newline at end of file diff --git a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java index aec61fa..4419909 100644 --- a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java +++ b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java @@ -44,7 +44,7 @@ import org.apache.james.util.SizeFormat; * * This class is not thread safe! */ -public class MimeMessageInputStreamSource extends Disposable.LeakAware implements MimeMessageSource, Disposable { +public class MimeMessageInputStreamSource extends Disposable.LeakAware<MimeMessageInputStreamSource.Resource> implements MimeMessageSource { /** * 100kb threshold for the stream. */ @@ -58,14 +58,6 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement } private static final int THRESHOLD = threshold(); - - private final Set<InputStream> streams = new HashSet<>(); - - /** - * A temporary file used to hold the message stream - */ - private BufferedDeferredFileOutputStream out; - /** * The full path of the temporary file */ @@ -76,6 +68,62 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement */ private static final File TMPDIR = new File(System.getProperty("java.io.tmpdir")); + static class Resource extends LeakAware.Resource { + private final BufferedDeferredFileOutputStream out; + private final Set<InputStream> streams; + + Resource(BufferedDeferredFileOutputStream out, Set<InputStream> streams) { + super(() -> { + // explicit close all streams + for (InputStream stream : streams) { + try { + stream.close(); + } catch (IOException e) { + //ignore exception during close + } + } + + if (out != null) { + try { + out.close(); + } catch (IOException e) { + //ignore exception during close + } + File file = out.getFile(); + if (file != null) { + FileUtils.deleteQuietly(file); + file = null; + } + out.dispose(); + } + }); + this.out = out; + this.streams = streams; + } + + public BufferedDeferredFileOutputStream getOut() { + return out; + } + + + } + + public static MimeMessageInputStreamSource create(String key, InputStream in) throws MessagingException { + Disposable.LeakAware.track(); + BufferedDeferredFileOutputStream out = new BufferedDeferredFileOutputStream(THRESHOLD, "mimemessage-" + key, ".m64", TMPDIR); + Resource resource = new Resource(out, new HashSet<>()); + + return new MimeMessageInputStreamSource(resource, key, in); + } + + public static MimeMessageInputStreamSource create(String key) { + Disposable.LeakAware.track(); + BufferedDeferredFileOutputStream out = new BufferedDeferredFileOutputStream(THRESHOLD, "mimemessage-" + key, ".m64", TMPDIR); + Resource resource = new Resource(out, new HashSet<>()); + + return new MimeMessageInputStreamSource(resource, key); + } + /** * Construct a new MimeMessageInputStreamSource from an * <code>InputStream</code> that contains the bytes of a MimeMessage. @@ -84,24 +132,23 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement * @param in the stream containing the MimeMessage * @throws MessagingException if an error occurs while trying to store the stream */ - public MimeMessageInputStreamSource(String key, InputStream in) throws MessagingException { - super(); + private MimeMessageInputStreamSource(Resource resource, String key, InputStream in) throws MessagingException { + super(resource); // We want to immediately read this into a temporary file // Create a temp file and channel the input stream into it try { - out = new BufferedDeferredFileOutputStream(THRESHOLD, "mimemessage-" + key, ".m64", TMPDIR); - IOUtils.copy(in, out); + IOUtils.copy(in, resource.out); sourceId = key; } catch (IOException ioe) { - File file = out.getFile(); + File file = resource.out.getFile(); if (file != null) { FileUtils.deleteQuietly(file); } throw new MessagingException("Unable to retrieve the data: " + ioe.getMessage(), ioe); } finally { try { - if (out != null) { - out.close(); + if (resource.out != null) { + resource.out.close(); } } catch (IOException ioe) { // Ignored - logging unavailable to log this non-fatal error. @@ -118,9 +165,8 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement } } - public MimeMessageInputStreamSource(String key) { - super(); - out = new BufferedDeferredFileOutputStream(THRESHOLD, key, ".m64", TMPDIR); + private MimeMessageInputStreamSource(Resource resource, String key) { + super(resource); sourceId = key; } @@ -142,12 +188,12 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement @Override public InputStream getInputStream() throws IOException { InputStream in; - if (out.isInMemory()) { - in = new SharedByteArrayInputStream(out.getData()); + if (getResource().getOut().isInMemory()) { + in = new SharedByteArrayInputStream(getResource().getOut().getData()); } else { - in = new SharedFileInputStream(out.getFile()); + in = new SharedFileInputStream(getResource().getOut().getFile()); } - streams.add(in); + getResource().streams.add(in); return in; } @@ -155,43 +201,13 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement * Get the size of the temp file * * @return the size of the temp file - * @throws IOException if an error is encoutered while computing the size of the - * message */ @Override - public long getMessageSize() throws IOException { - return out.getByteCount(); + public long getMessageSize() { + return getResource().getOut().getByteCount(); } public OutputStream getWritableOutputStream() { - return out; - } - - @Override - public void dispose() { - // explicit close all streams - for (InputStream stream : streams) { - try { - stream.close(); - } catch (IOException e) { - //ignore exception during close - } - } - - if (out != null) { - try { - out.close(); - } catch (IOException e) { - //ignore exception during close - } - File file = out.getFile(); - if (file != null) { - FileUtils.deleteQuietly(file); - file = null; - } - out = null; - } - disposed(); + return getResource().getOut(); } - } diff --git a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java index 5255387..99bd591 100644 --- a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java +++ b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java @@ -153,7 +153,7 @@ public class MimeMessageWrapper extends MimeMessage implements Disposable { in.close(); saved = true; } else { - MimeMessageInputStreamSource src = new MimeMessageInputStreamSource("MailCopy-" + UUID.randomUUID().toString()); + MimeMessageInputStreamSource src = MimeMessageInputStreamSource.create("MailCopy-" + UUID.randomUUID().toString()); OutputStream out = src.getWritableOutputStream(); original.writeTo(out); out.close(); diff --git a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java index e98f597..b870aa7 100644 --- a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java +++ b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java @@ -41,20 +41,20 @@ public class MimeMessageInputStreamSourceTest { @Test public void streamWith1MBytesShouldBeReadable() throws MessagingException, IOException { - testee = new MimeMessageInputStreamSource("myKey", new ZeroedInputStream(_1M)); + testee = MimeMessageInputStreamSource.create("myKey", new ZeroedInputStream(_1M)); assertThat(testee.getInputStream()).hasSameContentAs(new ZeroedInputStream(_1M)); } @Test public void streamWith10KBytesShouldBeReadable() throws MessagingException, IOException { - testee = new MimeMessageInputStreamSource("myKey", new ZeroedInputStream(_10KB)); + testee = MimeMessageInputStreamSource.create("myKey", new ZeroedInputStream(_10KB)); assertThat(testee.getInputStream()).hasSameContentAs(new ZeroedInputStream(_10KB)); } @Test public void streamWithVeryShortNameShouldWork() throws MessagingException, IOException { String veryShortName = "1"; - testee = new MimeMessageInputStreamSource(veryShortName, new ZeroedInputStream(_1M)); + testee = MimeMessageInputStreamSource.create(veryShortName, new ZeroedInputStream(_1M)); assertThat(testee.getInputStream()).isNotNull(); } } diff --git a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java index 9686fe2..5c5ca59 100644 --- a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java +++ b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java @@ -92,7 +92,7 @@ public class MimeMessageWrapperTest extends MimeMessageFromStreamTest { @Override protected TestableMimeMessageWrapper getMessageFromSources(String sources) throws Exception { - MimeMessageInputStreamSource mmis = new MimeMessageInputStreamSource("test", new SharedByteArrayInputStream(sources.getBytes())); + MimeMessageInputStreamSource mmis = MimeMessageInputStreamSource.create("test", new SharedByteArrayInputStream(sources.getBytes())); return new TestableMimeMessageWrapper(mmis); } @@ -348,7 +348,7 @@ public class MimeMessageWrapperTest extends MimeMessageFromStreamTest { @Test public void getMessageSizeShouldBeAccurateWhenHeadersAreModified() throws Exception { - MimeMessageWrapper wrapper = new MimeMessageWrapper(new MimeMessageInputStreamSource(MailImpl.getId(), + MimeMessageWrapper wrapper = new MimeMessageWrapper(MimeMessageInputStreamSource.create(MailImpl.getId(), ClassLoaderUtils.getSystemResourceAsSharedStream("JAMES-1593.eml"))); wrapper.setHeader("header", "vss"); @@ -358,7 +358,7 @@ public class MimeMessageWrapperTest extends MimeMessageFromStreamTest { @Test public void getMessageSizeShouldBeAccurateWhenHeadersAreModifiedAndOtherEncoding() throws Exception { - MimeMessageWrapper wrapper = new MimeMessageWrapper(new MimeMessageInputStreamSource(MailImpl.getId(), + MimeMessageWrapper wrapper = new MimeMessageWrapper(MimeMessageInputStreamSource.create(MailImpl.getId(), ClassLoaderUtils.getSystemResourceAsSharedStream("mail-containing-unicode-characters.eml"))); wrapper.setHeader("header", "vss"); diff --git a/server/container/lifecycle-api/pom.xml b/server/container/lifecycle-api/pom.xml index 8a56000..15fc676 100644 --- a/server/container/lifecycle-api/pom.xml +++ b/server/container/lifecycle-api/pom.xml @@ -47,6 +47,11 @@ <artifactId>commons-configuration2</artifactId> </dependency> <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> diff --git a/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java b/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java index 1e99a40..bd69789 100644 --- a/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java +++ b/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java @@ -19,6 +19,18 @@ package org.apache.james.lifecycle.api; +import java.lang.ref.PhantomReference; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Classes which implement this interface need some special handling on destroy. * So the {@link #dispose()} method need to get called @@ -29,4 +41,169 @@ public interface Disposable { * Dispose the object */ void dispose(); + + abstract class LeakAware<T extends LeakAware.Resource> implements Disposable { + public static class Resource implements Disposable { + private final AtomicBoolean isDisposed = new AtomicBoolean(false); + private final Disposable cleanup; + + public Resource(Disposable cleanup) { + this.cleanup = cleanup; + } + + public boolean isDisposed() { + return isDisposed.get(); + } + + @Override + public void dispose() { + isDisposed.set(true); + cleanup.dispose(); + } + } + + public static class LeakDetectorException extends RuntimeException { + public LeakDetectorException() { + super(); + } + } + + public enum Level { + NONE, + SIMPLE, + ADVANCED, + TESTING; + + static Level parse(String input) { + for (Level level : values()) { + if (level.name().equalsIgnoreCase(input)) { + return level; + } + } + throw new IllegalArgumentException(String.format("Unknown level `%s`", input)); + } + } + + public static final ReferenceQueue<LeakAware<?>> REFERENCE_QUEUE = new ReferenceQueue<>(); + public static final ConcurrentHashMap<LeakAwareFinalizer, Boolean> REFERENCES_IN_USE = new ConcurrentHashMap<>(); + public static final Level LEVEL = Optional.ofNullable(System.getProperty("james.ligecycle.leak.detection.mode")) + .map(Level::parse).orElse(Level.SIMPLE); + + public static void track() { + Reference<?> referenceFromQueue; + while ((referenceFromQueue = REFERENCE_QUEUE.poll()) != null) { + if (leakDetectorEnabled()) { + ((LeakAwareFinalizer) referenceFromQueue).detectLeak(); + } + referenceFromQueue.clear(); + } + } + + private static boolean leakDetectorEnabled() { + return LEVEL != Level.NONE; + } + + public static boolean tracedEnabled() { + return LEVEL == Level.ADVANCED || LEVEL == Level.TESTING; + } + + private final T resource; + private LeakAwareFinalizer finalizer; + + protected LeakAware(T resource) { + this.resource = resource; + if (leakDetectorEnabled()) { + this.finalizer = new LeakAwareFinalizer(this, resource, REFERENCE_QUEUE); + REFERENCES_IN_USE.put(finalizer, true); + } + } + + @Override + public void dispose() { + if (finalizer != null) { + REFERENCES_IN_USE.remove(finalizer); + } + resource.dispose(); + } + + public T getResource() { + return resource; + } + } + + class TraceRecord { + private final List<StackWalker.StackFrame> stackFrames; + + TraceRecord(List<StackWalker.StackFrame> stackFrames) { + this.stackFrames = stackFrames; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + this.stackFrames.subList(3, this.stackFrames.size()) + .forEach(stackFrame -> { + buf.append("\t"); + buf.append(stackFrame.getClassName()); + buf.append("#"); + buf.append(stackFrame.getMethodName()); + buf.append(":"); + buf.append(stackFrame.getLineNumber()); + buf.append("\n"); + }); + return buf.toString(); + } + } + + class LeakAwareFinalizer extends PhantomReference<LeakAware<?>> { + private static final Logger LOGGER = LoggerFactory.getLogger(LeakAwareFinalizer.class); + + private final LeakAware.Resource resource; + private TraceRecord traceRecord; + + public LeakAwareFinalizer(LeakAware referent, LeakAware.Resource resource, ReferenceQueue<? super LeakAware<?>> q) { + super(referent, q); + this.resource = resource; + if (LeakAware.tracedEnabled()) { + traceRecord = new TraceRecord(StackWalker.getInstance().walk(s -> s.collect(Collectors.toList()))); + } + } + + public void detectLeak() { + switch (LeakAware.LEVEL) { + case NONE: // nothing + break; + case SIMPLE: + case ADVANCED: { + if (isNotDisposed()) { + errorLog(); + resource.dispose(); + LeakAware.REFERENCES_IN_USE.remove(this); + } + break; + } + case TESTING: { + if (isNotDisposed()) { + errorLog(); + throw new LeakAware.LeakDetectorException(); + } + } + } + } + + public void errorLog() { + if (LeakAware.tracedEnabled()) { + LOGGER.error("Leak detected! Resource {} was not released before its referent was garbage-collected. \n" + + "This resource was instanced at: \n{}", resource, traceRecord.toString()); + } else { + LOGGER.error("Leak detected! Resource {} was not released before its referent was garbage-collected. \n" + + "Resource management needs to be reviewed: ensure to always call dispose() for disposable objects you work with. \n" + + "Consider enabling advanced leak detection to further identify the problem.", resource); + } + } + + private boolean isNotDisposed() { + return !resource.isDisposed(); + } + } } diff --git a/server/container/lifecycle-api/src/test/java/org/apache/james/lifecycle/api/LeakAwareTest.java b/server/container/lifecycle-api/src/test/java/org/apache/james/lifecycle/api/LeakAwareTest.java new file mode 100644 index 0000000..b774024 --- /dev/null +++ b/server/container/lifecycle-api/src/test/java/org/apache/james/lifecycle/api/LeakAwareTest.java @@ -0,0 +1,200 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.lifecycle.api; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS; +import static org.awaitility.Durations.TEN_SECONDS; +import static org.apache.james.lifecycle.api.Disposable.LeakAware; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; + +class LeakAwareTest { + + private static final class LeakResourceSample extends LeakAware<LeakResourceSample.TestResource> { + static class TestResource extends LeakAware.Resource { + public TestResource(Disposable cleanup) { + super(cleanup); + } + } + + public static LeakResourceSample create(AtomicBoolean atomicBoolean) { + return new LeakResourceSample(new TestResource(() -> atomicBoolean.set(true))); + } + + LeakResourceSample(TestResource resource) { + super(resource); + } + } + + private static final ConditionFactory awaitAtMostTenSeconds = Awaitility + .with().pollInterval(ONE_HUNDRED_MILLISECONDS) + .and().pollDelay(ONE_HUNDRED_MILLISECONDS) + .await() + .atMost(TEN_SECONDS); + + public static ListAppender<ILoggingEvent> getListAppenderForClass(Class clazz) { + Logger logger = (Logger) LoggerFactory.getLogger(clazz); + + ListAppender<ILoggingEvent> loggingEventListAppender = new ListAppender<>(); + loggingEventListAppender.start(); + + logger.addAppender(loggingEventListAppender); + return loggingEventListAppender; + } + + private void forceChangeLevel(String level) throws NoSuchFieldException, IllegalAccessException { + forceChangeLevel(LeakAware.Level.parse(level)); + } + + // using reflect to change LeakAware.LEVEL value + private static void forceChangeLevel(LeakAware.Level level) throws NoSuchFieldException, IllegalAccessException { + final Field field = LeakAware.class.getDeclaredField("LEVEL"); + field.setAccessible(true); + final Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(null, level); + } + + @Test + void leakDetectionShouldCloseUnclosedResources() { + AtomicBoolean atomicBoolean = new AtomicBoolean(false); + LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean); + resourceSample = null; + + System.gc(); + awaitAtMostTenSeconds.until(() -> { + LeakAware.track(); + return atomicBoolean.get(); + }); + } + + @Test + void leakDetectionShouldNotReportClosedObjects() { + AtomicBoolean atomicBoolean = new AtomicBoolean(false); + LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean); + resourceSample.dispose(); + atomicBoolean.set(false); + resourceSample = null; + + System.gc(); + awaitAtMostTenSeconds.until(() -> { + LeakAware.track(); + return !atomicBoolean.get(); + }); + } + + @Test + void resourceShouldNotBeDetectedLeakWhenLevelIsNone() throws InterruptedException, NoSuchFieldException, IllegalAccessException { + forceChangeLevel(LeakAware.Level.NONE); + AtomicBoolean atomicBoolean = new AtomicBoolean(false); + LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean); + resourceSample = null; + + System.gc(); + Thread.sleep(500); + LeakAware.track(); + assertThat(atomicBoolean.get()).isFalse(); + } + + @ParameterizedTest + @ValueSource(strings = {"simple", "advanced"}) + void leakDetectionShouldLogWhenDetected(String level) throws NoSuchFieldException, IllegalAccessException { + forceChangeLevel(level); + ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(Disposable.LeakAwareFinalizer.class); + + AtomicBoolean atomicBoolean = new AtomicBoolean(false); + LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean); + resourceSample = null; + + System.gc(); + awaitAtMostTenSeconds.untilAsserted(() -> { + LeakAware.track(); + assertThat(loggingEvents.list).hasSize(1) + .allSatisfy(loggingEvent -> { + assertThat(loggingEvent.getLevel()).isEqualTo(Level.ERROR); + assertThat(loggingEvent.getFormattedMessage()).contains("Leak detected", "TestResource"); + }); + }); + } + + @Test + void leakDetectionShouldLogTraceRecordWhenLevelIsAdvanced() throws NoSuchFieldException, IllegalAccessException { + forceChangeLevel(LeakAware.Level.ADVANCED); + ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(Disposable.LeakAwareFinalizer.class); + AtomicBoolean atomicBoolean = new AtomicBoolean(false); + LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean); + resourceSample = null; + + System.gc(); + awaitAtMostTenSeconds.untilAsserted(() -> { + LeakAware.track(); + assertThat(loggingEvents.list).hasSize(1) + .allSatisfy(loggingEvent -> { + assertThat(loggingEvent.getLevel()).isEqualTo(Level.ERROR); + assertThat(loggingEvent.getFormattedMessage()).contains("This resource was instanced at", "LeakAwareTest#leakDetectionShouldLogTraceRecordWhenLevelIsAdvanced"); + }); + }); + } + + @Test + void leakDetectionShouldThrowWhenDetectedAndLevelIsTesting() throws NoSuchFieldException, IllegalAccessException, InterruptedException { + forceChangeLevel(LeakAware.Level.TESTING); + AtomicBoolean atomicBoolean = new AtomicBoolean(false); + LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean); + resourceSample = null; + + System.gc(); + Thread.sleep(500); + assertThatThrownBy(LeakAware::track) + .isInstanceOf(LeakAware.LeakDetectorException.class); + } + + @Test + void leakDetectionShouldNotLogWhenLevelIsNone() throws InterruptedException, NoSuchFieldException, IllegalAccessException { + forceChangeLevel(LeakAware.Level.NONE); + ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(Disposable.LeakAwareFinalizer.class); + AtomicBoolean atomicBoolean = new AtomicBoolean(false); + LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean); + resourceSample = null; + + System.gc(); + Thread.sleep(500); + assertThat(loggingEvents.list).isEmpty(); + } + +} diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java index 8c6eaaa..0ffa698 100644 --- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java +++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java @@ -52,7 +52,7 @@ public class JamesDataCmdHandler extends DataCmdHandler { @Override protected SMTPResponse doDATA(SMTPSession session, String argument) { try { - MimeMessageInputStreamSource mmiss = new MimeMessageInputStreamSource(MailImpl.getId()); + MimeMessageInputStreamSource mmiss = MimeMessageInputStreamSource.create(MailImpl.getId()); session.setAttachment(SMTPConstants.DATA_MIMEMESSAGE_STREAMSOURCE, mmiss, State.Transaction); } catch (Exception e) { LOGGER.warn("Error creating mimemessagesource for incoming data", e); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
