JAMES-2551 adding `inititalized` property inside MemoizedSupplier

For checking whether value from memorized supplier is already initialized or not


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/aa7882d3
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/aa7882d3
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/aa7882d3

Branch: refs/heads/master
Commit: aa7882d3e0380eea5e9cbb02024e43c929d58b46
Parents: e410971
Author: duc <[email protected]>
Authored: Tue Oct 9 16:22:18 2018 +0700
Committer: Benoit Tellier <[email protected]>
Committed: Thu Oct 25 15:07:52 2018 +0700

----------------------------------------------------------------------
 .../backend/rabbitmq/RabbitChannelPoolImpl.java | 36 +++++----
 .../org/apache/james/util/MemoizedSupplier.java | 33 ++++++++-
 .../apache/james/util/MemoizedSupplierTest.java | 77 ++++++++++++++++----
 3 files changed, 116 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/aa7882d3/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
index ed58f6c..f5e6934 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
@@ -19,9 +19,6 @@
 
 package org.apache.james.backend.rabbitmq;
 
-import java.io.IOException;
-import java.util.function.Supplier;
-
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
@@ -31,6 +28,8 @@ import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.james.util.MemoizedSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.rabbitmq.client.Channel;
@@ -38,17 +37,19 @@ import com.rabbitmq.client.Connection;
 
 public class RabbitChannelPoolImpl implements RabbitMQChannelPool {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RabbitChannelPoolImpl.class);
+
     private static class ChannelBasePooledObjectFactory extends 
BasePooledObjectFactory<Channel> {
-        private final Supplier<Connection> rabbitConnection;
+        private final MemoizedSupplier<Connection> rabbitConnectionSupplier;
 
         public ChannelBasePooledObjectFactory(RabbitMQConnectionFactory 
factory) {
-            this.rabbitConnection = MemoizedSupplier.of(
+            this.rabbitConnectionSupplier = MemoizedSupplier.of(
                     Throwing.supplier(factory::create).sneakyThrow());
         }
 
         @Override
         public Channel create() throws Exception {
-            return rabbitConnection.get()
+            return rabbitConnectionSupplier.get()
                     .createChannel();
         }
 
@@ -62,15 +63,20 @@ public class RabbitChannelPoolImpl implements 
RabbitMQChannelPool {
             Channel channel = pooledObject.getObject();
             channel.close();
         }
+
+        private void closeRabbitConnection() {
+            rabbitConnectionSupplier.ifInitialized(
+                
Throwing.<Connection>consumer(Connection::close).sneakyThrow());
+        }
     }
 
     private final ObjectPool<Channel> pool;
-    private final ChannelBasePooledObjectFactory pooledObjectFactory;
+    private final ChannelBasePooledObjectFactory pooledChannelsFactory;
 
     @Inject
     public RabbitChannelPoolImpl(RabbitMQConnectionFactory factory) {
-        pooledObjectFactory = new ChannelBasePooledObjectFactory(factory);
-        pool = new GenericObjectPool<>(pooledObjectFactory);
+        pooledChannelsFactory = new ChannelBasePooledObjectFactory(factory);
+        pool = new GenericObjectPool<>(pooledChannelsFactory);
     }
 
     @Override
@@ -94,11 +100,13 @@ public class RabbitChannelPoolImpl implements 
RabbitMQChannelPool {
     }
 
     @PreDestroy
-    public void close() throws IOException {
-        pool.close();
-        pooledObjectFactory.rabbitConnection
-            .get()
-            .close();
+    public void close() {
+        try {
+            pool.close();
+            pooledChannelsFactory.closeRabbitConnection();
+        } catch (Exception e) {
+            LOGGER.error("error while closing rabbit channels & connections", 
e);
+        }
     }
 
     private Channel borrowChannel() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/aa7882d3/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java
 
b/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java
index d0b4e3b..e7f794b 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java
@@ -19,12 +19,39 @@
 
 package org.apache.james.util;
 
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import com.google.common.base.Suppliers;
 
-public class MemoizedSupplier {
-    public static <T> Supplier<T> of(Supplier<T> originalSupplier) {
-        return Suppliers.memoize(originalSupplier::get)::get;
+/**
+ * This supplier is based on memorized supplier from guava(since 
guava-25.1-jre) with additional
+ * information about value initializing state. Because guava's memorized 
supplier
+ * doesn't support client to check whether value is initialized or not.
+ */
+public class MemoizedSupplier<T> implements Supplier<T> {
+    public static <T> MemoizedSupplier<T> of(Supplier<T> originalSupplier) {
+        return new MemoizedSupplier<>(originalSupplier);
+    }
+
+    private final Supplier<T> memorizeSupplier;
+    private final AtomicReference<T> valueReference;
+
+    public MemoizedSupplier(Supplier<T> originalSupplier) {
+        this.memorizeSupplier = Suppliers.memoize(originalSupplier::get);
+        this.valueReference = new AtomicReference<>();
+    }
+
+    public void ifInitialized(Consumer<T> valueConsumer) {
+        T value = valueReference.get();
+        if (value != null) {
+            valueConsumer.accept(value);
+        }
+    }
+
+    @Override
+    public T get() {
+        return this.valueReference.updateAndGet(originalValue -> 
memorizeSupplier.get());
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/aa7882d3/server/container/util/src/test/java/org/apache/james/util/MemoizedSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/MemoizedSupplierTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/MemoizedSupplierTest.java
index e774354..3fa34e7 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/MemoizedSupplierTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/MemoizedSupplierTest.java
@@ -21,41 +21,58 @@ package org.apache.james.util;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
 
-import org.junit.Test;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
 
-public class MemoizedSupplierTest {
+class MemoizedSupplierTest {
 
     @Test
-    public void getShouldReturnSuppliedValue() {
-        Supplier<Integer> supplier = MemoizedSupplier.of(() -> 42);
+    void getShouldReturnSuppliedValue() {
+        MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> 42);
 
         assertThat(supplier.get()).isEqualTo(42);
     }
 
     @Test
-    public void getShouldBeIdempotent() {
-        Supplier<Integer> supplier = MemoizedSupplier.of(() -> 42);
+    void getShouldBeIdempotent() {
+        MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> 42);
 
         supplier.get();
         assertThat(supplier.get()).isEqualTo(42);
     }
 
+
     @Test
-    public void nullValueShouldBeSupported() {
-        Supplier<Integer> supplier = MemoizedSupplier.of(() -> null);
+    void getShouldReturnSameMemorizedInstanceInParallel() throws Exception {
+        AtomicInteger counter = new AtomicInteger(0);
+        MemoizedSupplier<Integer> supplier = 
MemoizedSupplier.of(counter::incrementAndGet);
+
+        ConcurrentTestRunner.builder()
+            .operation((threadNumber, operationNumber) -> supplier.get())
+            .threadCount(20)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(counter.get()).isEqualTo(1);
+    }
+
+    @Test
+    void nullValueShouldBeSupported() {
+        MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> null);
 
         supplier.get();
         assertThat(supplier.get()).isNull();
     }
 
     @Test
-    public void underlyingSupplierShouldBeCalledOnlyOnce() {
+    void underlyingSupplierShouldBeCalledOnlyOnce() {
         AtomicInteger atomicInteger = new AtomicInteger(0);
 
-        Supplier<Integer> supplier = MemoizedSupplier.of(() -> {
+        MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> {
             atomicInteger.incrementAndGet();
             return 42;
         });
@@ -67,10 +84,10 @@ public class MemoizedSupplierTest {
     }
 
     @Test
-    public void 
underlyingSupplierShouldBeCalledOnlyOnceWhenReturningNullValue() {
+    void underlyingSupplierShouldBeCalledOnlyOnceWhenReturningNullValue() {
         AtomicInteger atomicInteger = new AtomicInteger(0);
 
-        Supplier<Integer> supplier = MemoizedSupplier.of(() -> {
+        MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> {
             atomicInteger.incrementAndGet();
             return null;
         });
@@ -81,4 +98,38 @@ public class MemoizedSupplierTest {
         assertThat(atomicInteger.get()).isEqualTo(1);
     }
 
+    @Test
+    void ifInitializedShouldPerformWhenValueIsInitialized() {
+        AtomicBoolean performAfterInitialization = new AtomicBoolean(false);
+        MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> 10);
+
+        supplier.get();
+        supplier.ifInitialized(value -> performAfterInitialization.set(true));
+        assertThat(performAfterInitialization.get()).isTrue();
+    }
+
+    @Test
+    void ifInitializedShouldPerformOnlyOnceWhenValueIsInitializedInParallel() 
throws Exception {
+        AtomicInteger performAfterInitializationCounter = new AtomicInteger(0);
+        MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> 10);
+
+        ConcurrentTestRunner.builder()
+            .operation((threadNumber, operationNumber) -> supplier.get())
+            .threadCount(20)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+        supplier.ifInitialized(value -> 
performAfterInitializationCounter.incrementAndGet());
+
+        assertThat(performAfterInitializationCounter.get()).isEqualTo(1);
+    }
+
+
+    @Test
+    void ifInitializedShouldNotPerformWhenValueIsNotInitialized() {
+        AtomicBoolean performAfterInitialization = new AtomicBoolean(false);
+        MemoizedSupplier<Integer> supplier = MemoizedSupplier.of(() -> 10);
+
+        supplier.ifInitialized(value -> performAfterInitialization.set(true));
+        assertThat(performAfterInitialization.get()).isFalse();
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to