JAMES-2551 adding `inititalized` property inside MemoizedSupplier For checking whether value from memorized supplier is already initialized or not
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/aa7882d3 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/aa7882d3 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/aa7882d3 Branch: refs/heads/master Commit: aa7882d3e0380eea5e9cbb02024e43c929d58b46 Parents: e410971 Author: duc <[email protected]> Authored: Tue Oct 9 16:22:18 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Thu Oct 25 15:07:52 2018 +0700 ---------------------------------------------------------------------- .../backend/rabbitmq/RabbitChannelPoolImpl.java | 36 +++++---- .../org/apache/james/util/MemoizedSupplier.java | 33 ++++++++- .../apache/james/util/MemoizedSupplierTest.java | 77 ++++++++++++++++---- 3 files changed, 116 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/aa7882d3/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java index ed58f6c..f5e6934 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java @@ -19,9 +19,6 @@ package org.apache.james.backend.rabbitmq; -import java.io.IOException; -import java.util.function.Supplier; - import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -31,6 +28,8 @@ import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.james.util.MemoizedSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.rabbitmq.client.Channel; @@ -38,17 +37,19 @@ import com.rabbitmq.client.Connection; public class RabbitChannelPoolImpl implements RabbitMQChannelPool { + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitChannelPoolImpl.class); + private static class ChannelBasePooledObjectFactory extends BasePooledObjectFactory<Channel> { - private final Supplier<Connection> rabbitConnection; + private final MemoizedSupplier<Connection> rabbitConnectionSupplier; public ChannelBasePooledObjectFactory(RabbitMQConnectionFactory factory) { - this.rabbitConnection = MemoizedSupplier.of( + this.rabbitConnectionSupplier = MemoizedSupplier.of( Throwing.supplier(factory::create).sneakyThrow()); } @Override public Channel create() throws Exception { - return rabbitConnection.get() + return rabbitConnectionSupplier.get() .createChannel(); } @@ -62,15 +63,20 @@ public class RabbitChannelPoolImpl implements RabbitMQChannelPool { Channel channel = pooledObject.getObject(); channel.close(); } + + private void closeRabbitConnection() { + rabbitConnectionSupplier.ifInitialized( + Throwing.<Connection>consumer(Connection::close).sneakyThrow()); + } } private final ObjectPool<Channel> pool; - private final ChannelBasePooledObjectFactory pooledObjectFactory; + private final ChannelBasePooledObjectFactory pooledChannelsFactory; @Inject public RabbitChannelPoolImpl(RabbitMQConnectionFactory factory) { - pooledObjectFactory = new ChannelBasePooledObjectFactory(factory); - pool = new GenericObjectPool<>(pooledObjectFactory); + pooledChannelsFactory = new ChannelBasePooledObjectFactory(factory); + pool = new GenericObjectPool<>(pooledChannelsFactory); } @Override @@ -94,11 +100,13 @@ public class RabbitChannelPoolImpl implements RabbitMQChannelPool { } @PreDestroy - public void close() throws IOException { - pool.close(); - pooledObjectFactory.rabbitConnection - .get() - .close(); + public void close() { + try { + pool.close(); + pooledChannelsFactory.closeRabbitConnection(); + } catch (Exception e) { + LOGGER.error("error while closing rabbit channels & connections", e); + } } private Channel borrowChannel() { http://git-wip-us.apache.org/repos/asf/james-project/blob/aa7882d3/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java b/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java index d0b4e3b..e7f794b 100644 --- a/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java +++ b/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java @@ -19,12 +19,39 @@ package org.apache.james.util; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; import com.google.common.base.Suppliers; -public class MemoizedSupplier { - public static <T> Supplier<T> of(Supplier<T> originalSupplier) { - return Suppliers.memoize(originalSupplier::get)::get; +/** + * This supplier is based on memorized supplier from guava(since guava-25.1-jre) with additional + * information about value initializing state. Because guava's memorized supplier + * doesn't support client to check whether value is initialized or not. + */ +public class MemoizedSupplier<T> implements Supplier<T> { + public static <T> MemoizedSupplier<T> of(Supplier<T> originalSupplier) { + return new MemoizedSupplier<>(originalSupplier); + } + + private final Supplier<T> memorizeSupplier; + private final AtomicReference<T> valueReference; + + public MemoizedSupplier(Supplier<T> originalSupplier) { + this.memorizeSupplier = Suppliers.memoize(originalSupplier::get); + this.valueReference = new AtomicReference<>(); + } + + public void ifInitialized(Consumer<T> valueConsumer) { + T value = valueReference.get(); + if (value != null) { + valueConsumer.accept(value); + } + } + + @Override + public T get() { + return this.valueReference.updateAndGet(originalValue -> memorizeSupplier.get()); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/aa7882d3/server/container/util/src/test/java/org/apache/james/util/MemoizedSupplierTest.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/test/java/org/apache/james/util/MemoizedSupplierTest.java b/server/container/util/src/test/java/org/apache/james/util/MemoizedSupplierTest.java index e774354..3fa34e7 100644 --- a/server/container/util/src/test/java/org/apache/james/util/MemoizedSupplierTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/MemoizedSupplierTest.java @@ -21,41 +21,58 @@ package org.apache.james.util; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; -import org.junit.Test; +import org.apache.james.util.concurrency.ConcurrentTestRunner; +import org.junit.jupiter.api.Test; -public class MemoizedSupplierTest { +class MemoizedSupplierTest { @Test - public void getShouldReturnSuppliedValue() { - Supplier<Integer> supplier = MemoizedSupplier.of(() -> 42); + void getShouldReturnSuppliedValue() { + MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> 42); assertThat(supplier.get()).isEqualTo(42); } @Test - public void getShouldBeIdempotent() { - Supplier<Integer> supplier = MemoizedSupplier.of(() -> 42); + void getShouldBeIdempotent() { + MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> 42); supplier.get(); assertThat(supplier.get()).isEqualTo(42); } + @Test - public void nullValueShouldBeSupported() { - Supplier<Integer> supplier = MemoizedSupplier.of(() -> null); + void getShouldReturnSameMemorizedInstanceInParallel() throws Exception { + AtomicInteger counter = new AtomicInteger(0); + MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(counter::incrementAndGet); + + ConcurrentTestRunner.builder() + .operation((threadNumber, operationNumber) -> supplier.get()) + .threadCount(20) + .operationCount(10) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + assertThat(counter.get()).isEqualTo(1); + } + + @Test + void nullValueShouldBeSupported() { + MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> null); supplier.get(); assertThat(supplier.get()).isNull(); } @Test - public void underlyingSupplierShouldBeCalledOnlyOnce() { + void underlyingSupplierShouldBeCalledOnlyOnce() { AtomicInteger atomicInteger = new AtomicInteger(0); - Supplier<Integer> supplier = MemoizedSupplier.of(() -> { + MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> { atomicInteger.incrementAndGet(); return 42; }); @@ -67,10 +84,10 @@ public class MemoizedSupplierTest { } @Test - public void underlyingSupplierShouldBeCalledOnlyOnceWhenReturningNullValue() { + void underlyingSupplierShouldBeCalledOnlyOnceWhenReturningNullValue() { AtomicInteger atomicInteger = new AtomicInteger(0); - Supplier<Integer> supplier = MemoizedSupplier.of(() -> { + MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> { atomicInteger.incrementAndGet(); return null; }); @@ -81,4 +98,38 @@ public class MemoizedSupplierTest { assertThat(atomicInteger.get()).isEqualTo(1); } + @Test + void ifInitializedShouldPerformWhenValueIsInitialized() { + AtomicBoolean performAfterInitialization = new AtomicBoolean(false); + MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> 10); + + supplier.get(); + supplier.ifInitialized(value -> performAfterInitialization.set(true)); + assertThat(performAfterInitialization.get()).isTrue(); + } + + @Test + void ifInitializedShouldPerformOnlyOnceWhenValueIsInitializedInParallel() throws Exception { + AtomicInteger performAfterInitializationCounter = new AtomicInteger(0); + MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> 10); + + ConcurrentTestRunner.builder() + .operation((threadNumber, operationNumber) -> supplier.get()) + .threadCount(20) + .operationCount(10) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + supplier.ifInitialized(value -> performAfterInitializationCounter.incrementAndGet()); + + assertThat(performAfterInitializationCounter.get()).isEqualTo(1); + } + + + @Test + void ifInitializedShouldNotPerformWhenValueIsNotInitialized() { + AtomicBoolean performAfterInitialization = new AtomicBoolean(false); + MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> 10); + + supplier.ifInitialized(value -> performAfterInitialization.set(true)); + assertThat(performAfterInitialization.get()).isFalse(); + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
