DonalEvans commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r676841873
##########
File path:
geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
##########
@@ -244,18 +243,17 @@ public void
shouldContinueToFunction_whenOneServerShutsDownAbruptly_givenTwoSubs
Future<Void> subscriber2Future =
executor.submit(() -> subscriber2.subscribe(mockSubscriber2,
CHANNEL_NAME));
- assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was
not received")
- .isTrue();
+ assertThat(latch.await(30, TimeUnit.SECONDS))
Review comment:
Is 30 seconds just an arbitrary amount of time to wait here, or is there
some significance to this value?
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+ @Before
+ public void initialize() {
+ TestRunnable.outOfSequence = false;
+ TestUnstripedRunnable.outOfSequence = false;
+ TestFastRunnable.outOfSequence = false;
+ }
+
+ @Test
+ public void testSingleStripeRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ assertThat(pool.isTerminated()).isFalse();
+ assertThat(pool.isShutdown()).isFalse();
+
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+ assertThat(TestRunnable.outOfSequence)
+ .as("Expected no out-of-sequence runnables to execute")
+ .isFalse();
+ assertThat(pool.isTerminated()).isTrue();
+ }
+
+ @Test
+ public void testShutdown() throws InterruptedException {
+ ThreadGroup group = new ThreadGroup("stripetestgroup");
+ Thread starter = new Thread(group, "starter") {
+ public void run() {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ pool.shutdown();
+ }
+ };
+ starter.start();
+ starter.join();
+
+ for (int i = 0; i < 100; i++) {
+ if (group.activeCount() == 0) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+
+ assertThat(group.activeCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testShutdownNow() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ Thread.sleep(500);
+
+ assertThat(pool.isTerminated()).isFalse();
+ Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+ assertThat(pool.isShutdown()).isTrue();
+ assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+ assertThat(pool.isTerminated()).isTrue();
+
+ assertThat(unfinishedJobs.size() > 0).isTrue();
+
+ assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+ }
+
+ @Test
+ public void testSingleStripeCallableWithCompletionService()
+ throws InterruptedException, ExecutionException {
+ ExecutorService pool = new StripedExecutorService();
+ final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+ pool);
+
+ Thread testSubmitter = new Thread("TestSubmitter") {
+ public void run() {
+ Object stripe = new Object();
+ for (int i = 0; i < 50; i++) {
+ cs.submit(new TestCallable(stripe, i));
+ }
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ interrupt();
+ }
+ for (int i = 50; i < 100; i++) {
+ cs.submit(new TestCallable(stripe, i));
+ }
+ }
+ };
+ testSubmitter.start();
+
+ for (int i = 0; i < 100; i++) {
+ int actual = cs.take().get().intValue();
+ System.out.println("Retrieved " + actual);
+ assertThat(actual).isEqualTo(i);
+ }
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+ testSubmitter.join();
+ }
+
+ @Test
+ public void testUnstripedRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestUnstripedRunnable(actual, i));
+ }
+ pool.shutdown();
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+ assertThat(TestUnstripedRunnable.outOfSequence)
+ .as("Expected at least some out-of-sequence runnables to execute")
+ .isTrue();
+ }
+
+ @Test
+ public void testMultipleStripes() throws InterruptedException {
+ final ExecutorService pool = new StripedExecutorService();
+ ExecutorService producerPool = Executors.newCachedThreadPool();
+ for (int i = 0; i < 20; i++) {
+ producerPool.submit(new Runnable() {
Review comment:
This `new Runnable()` can be replaced with a lambda.
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -330,11 +335,14 @@ public void testSubscribeAndPublishUsingBinaryData() {
Long result = publisher.publish(binaryBlob, binaryBlob);
assertThat(result).isEqualTo(1);
+ GeodeAwaitility.await().untilAsserted(
+ () -> assertThat(mockSubscriber.getReceivedMessages()).isNotEmpty());
+
assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(binaryBlob);
Review comment:
See above comment about possibly combining these into one assertion.
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -303,11 +306,13 @@ public void testPublishBinaryData() {
Long result = publisher.publish("salutations".getBytes(), expectedMessage);
assertThat(result).isEqualTo(1);
+ GeodeAwaitility.await()
+ .untilAsserted(() ->
assertThat(mockSubscriber.getReceivedMessages()).isNotEmpty());
+
assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(expectedMessage);
Review comment:
Do we expect that there could be more than one received message here? If
not, then these assertions could be combined to one:
```
GeodeAwaitility.await()
.untilAsserted(() ->
assertThat(mockSubscriber.getReceivedMessages()).containsExactly(expectedMessage));
```
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
##########
@@ -42,13 +47,45 @@
public class PubSubImpl implements PubSub {
public static final String REDIS_PUB_SUB_FUNCTION_ID =
"redisPubSubFunctionID";
+ private static final int DEFAULT_PUBLISH_THREAD_COUNT =
+ Integer.getInteger("redis.publish-thread-count", 100);
+
private static final Logger logger = LogService.getLogger();
private final Subscriptions subscriptions;
+ private final ExecutorService executor;
+
+ /**
+ * Inner class to wrap the publish action and pass it to the {@link
StripedExecutorService}.
+ */
+ private static class PublishingRunnable implements StripedRunnable {
+
+ private final Runnable runnable;
+ private final Object stripeIdentity;
Review comment:
Is there a reason this field and the argument that the constructor takes
can't be a `Client`? The only use of the constructor always passes a `Client`
in, so it seems like we can be certain of the type.
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+ @Before
+ public void initialize() {
+ TestRunnable.outOfSequence = false;
+ TestUnstripedRunnable.outOfSequence = false;
+ TestFastRunnable.outOfSequence = false;
+ }
+
+ @Test
+ public void testSingleStripeRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ assertThat(pool.isTerminated()).isFalse();
+ assertThat(pool.isShutdown()).isFalse();
+
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+ assertThat(TestRunnable.outOfSequence)
+ .as("Expected no out-of-sequence runnables to execute")
+ .isFalse();
+ assertThat(pool.isTerminated()).isTrue();
+ }
+
+ @Test
+ public void testShutdown() throws InterruptedException {
+ ThreadGroup group = new ThreadGroup("stripetestgroup");
+ Thread starter = new Thread(group, "starter") {
+ public void run() {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ pool.shutdown();
+ }
+ };
+ starter.start();
+ starter.join();
+
+ for (int i = 0; i < 100; i++) {
+ if (group.activeCount() == 0) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+
+ assertThat(group.activeCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testShutdownNow() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ Thread.sleep(500);
+
+ assertThat(pool.isTerminated()).isFalse();
+ Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+ assertThat(pool.isShutdown()).isTrue();
+ assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+ assertThat(pool.isTerminated()).isTrue();
+
+ assertThat(unfinishedJobs.size() > 0).isTrue();
+
+ assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
Review comment:
Could this 100 and the one in the for loop be extracted to a variable,
to make it explicit what we're asserting here?
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -715,6 +751,34 @@ public void testPatternWithoutAGlob() {
assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
}
+ @Test
+ public void concurrentSubscribers_andPublishers_doesNotHang()
+ throws InterruptedException, ExecutionException {
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ Future<Integer> makeSubscribersFuture1 =
+ executor.submit(() -> makeSubscribers(10000, running));
+ Future<Integer> makeSubscribersFuture2 =
+ executor.submit(() -> makeSubscribers(10000, running));
+
+ Future<Integer> publish1 = executor.submit(() -> doPublishing(1, 10000,
running));
+ Future<Integer> publish2 = executor.submit(() -> doPublishing(2, 10000,
running));
+ Future<Integer> publish3 = executor.submit(() -> doPublishing(3, 10000,
running));
+ Future<Integer> publish4 = executor.submit(() -> doPublishing(4, 10000,
running));
+ Future<Integer> publish5 = executor.submit(() -> doPublishing(5, 10000,
running));
+
+ running.set(false);
+
+ assertThat(makeSubscribersFuture1.get()).isGreaterThanOrEqualTo(10);
+ assertThat(makeSubscribersFuture2.get()).isGreaterThanOrEqualTo(10);
Review comment:
Why are these assertions using a value of 10 specifically? Running
locally, both futures return the `minimumIterations` value of 10,000, so it
seems like it should be safe to use a much larger value for this check (though
possibly not 10,000 exactly, if there's a possibility of the test becoming
flaky as a result).
With a value as low as 10 it seems like there's a potential for a race
condition where 10 subscriptions are able to created before publishing begins,
which could cause the test to pass even if the publishing somehow caused a
problem with creating the subscriptions.
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+ @Before
+ public void initialize() {
+ TestRunnable.outOfSequence = false;
+ TestUnstripedRunnable.outOfSequence = false;
+ TestFastRunnable.outOfSequence = false;
+ }
+
+ @Test
+ public void testSingleStripeRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ assertThat(pool.isTerminated()).isFalse();
+ assertThat(pool.isShutdown()).isFalse();
+
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+ assertThat(TestRunnable.outOfSequence)
+ .as("Expected no out-of-sequence runnables to execute")
+ .isFalse();
+ assertThat(pool.isTerminated()).isTrue();
+ }
+
+ @Test
+ public void testShutdown() throws InterruptedException {
+ ThreadGroup group = new ThreadGroup("stripetestgroup");
+ Thread starter = new Thread(group, "starter") {
+ public void run() {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ pool.shutdown();
+ }
+ };
+ starter.start();
+ starter.join();
+
+ for (int i = 0; i < 100; i++) {
+ if (group.activeCount() == 0) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+
+ assertThat(group.activeCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testShutdownNow() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ Thread.sleep(500);
Review comment:
Could this sleep be replaced with a suitable `await()`?
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -494,6 +506,25 @@ private void runSubscribeAndPublish(int index, int
minimumIterations, AtomicBool
return mockSubscriber.getReceivedEvents();
}
+ @Test
+ public void ensureOrderingWithOneSubscriberMultiplePublishes() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+ executor.submit(() -> subscriber.subscribe(mockSubscriber, "salutations"));
Review comment:
Could this String be extracted to a constant? It seems to be used a lot
in this class, and it would be better to not have it hard-coded everywhere.
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+ @Before
+ public void initialize() {
+ TestRunnable.outOfSequence = false;
+ TestUnstripedRunnable.outOfSequence = false;
+ TestFastRunnable.outOfSequence = false;
+ }
+
+ @Test
+ public void testSingleStripeRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ assertThat(pool.isTerminated()).isFalse();
+ assertThat(pool.isShutdown()).isFalse();
+
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+ assertThat(TestRunnable.outOfSequence)
+ .as("Expected no out-of-sequence runnables to execute")
+ .isFalse();
+ assertThat(pool.isTerminated()).isTrue();
+ }
+
+ @Test
+ public void testShutdown() throws InterruptedException {
+ ThreadGroup group = new ThreadGroup("stripetestgroup");
+ Thread starter = new Thread(group, "starter") {
+ public void run() {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ pool.shutdown();
+ }
+ };
+ starter.start();
+ starter.join();
+
+ for (int i = 0; i < 100; i++) {
+ if (group.activeCount() == 0) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+
+ assertThat(group.activeCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testShutdownNow() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ Thread.sleep(500);
+
+ assertThat(pool.isTerminated()).isFalse();
+ Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+ assertThat(pool.isShutdown()).isTrue();
+ assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+ assertThat(pool.isTerminated()).isTrue();
+
+ assertThat(unfinishedJobs.size() > 0).isTrue();
+
+ assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+ }
+
+ @Test
+ public void testSingleStripeCallableWithCompletionService()
+ throws InterruptedException, ExecutionException {
+ ExecutorService pool = new StripedExecutorService();
+ final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+ pool);
+
+ Thread testSubmitter = new Thread("TestSubmitter") {
+ public void run() {
+ Object stripe = new Object();
+ for (int i = 0; i < 50; i++) {
+ cs.submit(new TestCallable(stripe, i));
+ }
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ interrupt();
+ }
Review comment:
What is the purpose of this 2-second wait between submitting the first
and second set of 50 callables? Could it be removed?
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+ @Before
+ public void initialize() {
+ TestRunnable.outOfSequence = false;
+ TestUnstripedRunnable.outOfSequence = false;
+ TestFastRunnable.outOfSequence = false;
+ }
+
+ @Test
+ public void testSingleStripeRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ assertThat(pool.isTerminated()).isFalse();
+ assertThat(pool.isShutdown()).isFalse();
+
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+ assertThat(TestRunnable.outOfSequence)
+ .as("Expected no out-of-sequence runnables to execute")
+ .isFalse();
+ assertThat(pool.isTerminated()).isTrue();
+ }
+
+ @Test
+ public void testShutdown() throws InterruptedException {
+ ThreadGroup group = new ThreadGroup("stripetestgroup");
+ Thread starter = new Thread(group, "starter") {
+ public void run() {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ pool.shutdown();
+ }
+ };
+ starter.start();
+ starter.join();
+
+ for (int i = 0; i < 100; i++) {
+ if (group.activeCount() == 0) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+
+ assertThat(group.activeCount()).isEqualTo(0);
Review comment:
Could this be replaced with an `await()`?
##########
File path:
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -436,11 +446,11 @@ public void testPunsubscribingImplicitlyFromAllChannels()
{
}
@Test
- public void ensureOrderingOfPublishedMessages() throws Exception {
+ public void ensureOrderingOfPublishedMessagesWithTwoSubscriptions() throws
Exception {
AtomicBoolean running = new AtomicBoolean(true);
Future<Void> future1 =
- executor.submit(() -> runSubscribeAndPublish(1, 10000, running));
+ executor.submit(() -> runSubscribeAndPublish(1, 3000, running));
Review comment:
A little outside the scope of this PR perhaps, but the `index` field can
be safely removed from this method signature and others in this test, since
it's always 1 and was only used to differentiate between multiple
subscriptions, rendering it redundant here.
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+ @Before
+ public void initialize() {
+ TestRunnable.outOfSequence = false;
+ TestUnstripedRunnable.outOfSequence = false;
+ TestFastRunnable.outOfSequence = false;
+ }
+
+ @Test
+ public void testSingleStripeRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ assertThat(pool.isTerminated()).isFalse();
+ assertThat(pool.isShutdown()).isFalse();
+
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+ assertThat(TestRunnable.outOfSequence)
+ .as("Expected no out-of-sequence runnables to execute")
+ .isFalse();
+ assertThat(pool.isTerminated()).isTrue();
+ }
+
+ @Test
+ public void testShutdown() throws InterruptedException {
+ ThreadGroup group = new ThreadGroup("stripetestgroup");
+ Thread starter = new Thread(group, "starter") {
+ public void run() {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ pool.shutdown();
+ }
+ };
+ starter.start();
+ starter.join();
+
+ for (int i = 0; i < 100; i++) {
+ if (group.activeCount() == 0) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+
+ assertThat(group.activeCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testShutdownNow() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ Thread.sleep(500);
+
+ assertThat(pool.isTerminated()).isFalse();
+ Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+ assertThat(pool.isShutdown()).isTrue();
+ assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+ assertThat(pool.isTerminated()).isTrue();
+
+ assertThat(unfinishedJobs.size() > 0).isTrue();
+
+ assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+ }
+
+ @Test
+ public void testSingleStripeCallableWithCompletionService()
+ throws InterruptedException, ExecutionException {
+ ExecutorService pool = new StripedExecutorService();
+ final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+ pool);
+
+ Thread testSubmitter = new Thread("TestSubmitter") {
+ public void run() {
+ Object stripe = new Object();
+ for (int i = 0; i < 50; i++) {
+ cs.submit(new TestCallable(stripe, i));
+ }
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ interrupt();
+ }
+ for (int i = 50; i < 100; i++) {
+ cs.submit(new TestCallable(stripe, i));
+ }
+ }
+ };
+ testSubmitter.start();
+
+ for (int i = 0; i < 100; i++) {
+ int actual = cs.take().get().intValue();
+ System.out.println("Retrieved " + actual);
+ assertThat(actual).isEqualTo(i);
+ }
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+ testSubmitter.join();
+ }
+
+ @Test
+ public void testUnstripedRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestUnstripedRunnable(actual, i));
+ }
+ pool.shutdown();
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+ assertThat(TestUnstripedRunnable.outOfSequence)
+ .as("Expected at least some out-of-sequence runnables to execute")
+ .isTrue();
+ }
+
+ @Test
+ public void testMultipleStripes() throws InterruptedException {
+ final ExecutorService pool = new StripedExecutorService();
+ ExecutorService producerPool = Executors.newCachedThreadPool();
+ for (int i = 0; i < 20; i++) {
+ producerPool.submit(new Runnable() {
+ public void run() {
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ }
+ });
+ }
+ producerPool.shutdown();
+
+ while (!producerPool.awaitTermination(1, TimeUnit.MINUTES)) {
+ ;
+ }
Review comment:
Can this be replaced with an `await()`?
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+ /**
+ * The wrapped ExecutorService that will actually execute our
+ * tasks.
+ */
+ private final ExecutorService executor;
+
+ /**
+ * The lock prevents shutdown from being called in the middle
+ * of a submit. It also guards the executors IdentityHashMap.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * This condition allows us to cleanly terminate this executor
+ * service.
+ */
+ private final Condition terminating = lock.newCondition();
+
+ /**
+ * Whenever a new StripedObject is submitted to the pool, it
+ * is added to this IdentityHashMap. As soon as the
+ * SerialExecutor is empty, the entry is removed from the map,
+ * in order to avoid a memory leak.
+ */
+ private final Map<Object, SerialExecutor> executors = new
IdentityHashMap<>();
+
+ /**
+ * The default submit() method creates a new FutureTask and
+ * wraps our StripedRunnable with it. We thus need to
+ * remember the stripe object somewhere. In our case, we will
+ * do this inside the ThreadLocal "stripes". Before the
+ * thread returns from submitting the runnable, it will always
+ * remove the thread local entry.
+ */
+ private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+ /**
+ * Valid states are RUNNING and SHUTDOWN. We rely on the
+ * underlying executor service for the remaining states.
+ */
+ private State state = State.RUNNING;
+
+ private enum State {
+ RUNNING, SHUTDOWN
+ }
+
+ /**
+ * Take care using this constructor. The original visibility was private
+ * since users should not shutdown their executors directly,
+ * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+ * passed in.
+ *
+ * @param executor the executor service that we use to execute
+ * the tasks
+ */
+ public StripedExecutorService(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a
+ * cached thread pool.
+ */
+ public StripedExecutorService() {
+ this(Executors.newCachedThreadPool());
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a fixed
+ * thread pool with the given number of threads.
+ */
+ public StripedExecutorService(int numberOfThreads) {
+ this(Executors.newFixedThreadPool(numberOfThreads));
+ }
+
+ /**
+ * If the runnable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual runnable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ saveStripedObject(runnable);
+ return super.newTaskFor(runnable, value);
+ }
+
+ /**
+ * If the callable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual callable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ saveStripedObject(callable);
+ return super.newTaskFor(callable);
+ }
+
+ /**
+ * Saves the stripe in a ThreadLocal until we can use it to
+ * schedule the task into our pool.
+ */
+ private void saveStripedObject(Object task) {
+ if (isStripedObject(task)) {
+ stripes.set(((StripedObject) task).getStripe());
+ }
+ }
+
+ /**
+ * Returns true if the object implements the StripedObject
+ * interface.
+ */
+ private static boolean isStripedObject(Object o) {
+ return o instanceof StripedObject;
+ }
+
+ /**
+ * Delegates the call to submit(task, null).
+ */
+ public Future<?> submit(Runnable task) {
+ return submit(task, null);
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Runnable task, T result) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task, result);
+ } else { // bypass the serial executors
+ return executor.submit(task, result);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Callable<T> task) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task);
+ } else { // bypass the serial executors
+ return executor.submit(task);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Throws a RejectedExecutionException if the state is not
+ * RUNNING.
+ */
+ private void checkPoolIsRunning() {
+ assert lock.isHeldByCurrentThread();
+ if (state != State.RUNNING) {
+ throw new RejectedExecutionException("executor not running");
+ }
+ }
+
+ /**
+ * Executes the command. If command implements StripedObject,
+ * we execute it with a SerialExecutor. This method can be
+ * called directly by clients or it may be called by the
+ * AbstractExecutorService's submit() methods. In that case,
+ * we check whether the stripes thread local has been set. If
+ * it is, we remove it and use it to determine the
+ * StripedObject and execute it with a SerialExecutor. If no
+ * StripedObject is set, we instead pass the command to the
+ * wrapped ExecutorService directly.
+ */
+ public void execute(Runnable command) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ Object stripe = getStripe(command);
+ if (stripe != null) {
+ SerialExecutor ser_exec = executors.get(stripe);
+ if (ser_exec == null) {
+ executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+ }
+ ser_exec.execute(command);
+ } else {
+ executor.execute(command);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * We get the stripe object either from the Runnable if it
+ * also implements StripedObject, or otherwise from the thread
+ * local temporary storage. Result may be null.
+ */
+ private Object getStripe(Runnable command) {
+ Object stripe;
+ if (command instanceof StripedObject) {
+ stripe = (((StripedObject) command).getStripe());
+ } else {
+ stripe = stripes.get();
+ }
+ stripes.remove();
+ return stripe;
+ }
+
+ /**
+ * Shuts down the StripedExecutorService. No more tasks will
+ * be submitted. If the map of SerialExecutors is empty, we
+ * shut down the wrapped executor.
+ */
+ public void shutdown() {
+ lock.lock();
+ try {
+ state = State.SHUTDOWN;
+ if (executors.isEmpty()) {
+ executor.shutdown();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * All the tasks in each of the SerialExecutors are drained
+ * to a list, as well as the tasks inside the wrapped
+ * ExecutorService. This is then returned to the user. Also,
+ * the shutdownNow method of the wrapped executor is called.
+ */
+ public List<Runnable> shutdownNow() {
+ lock.lock();
+ try {
+ shutdown();
+ List<Runnable> result = new ArrayList<>();
+ for (SerialExecutor ser_ex : executors.values()) {
+ ser_ex.tasks.drainTo(result);
+ }
+ result.addAll(executor.shutdownNow());
+ return result;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if shutdown() or shutdownNow() have been
+ * called; false otherwise.
+ */
+ public boolean isShutdown() {
+ lock.lock();
+ try {
+ return state == State.SHUTDOWN;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if this pool has been terminated, that is, all
+ * the SerialExecutors are empty and the wrapped
+ * ExecutorService has been terminated.
+ */
+ public boolean isTerminated() {
+ lock.lock();
+ try {
+ if (state == State.RUNNING)
+ return false;
+ for (SerialExecutor executor : executors.values()) {
+ if (!executor.isEmpty())
+ return false;
+ }
+ return executor.isTerminated();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if the wrapped ExecutorService terminates
+ * within the allotted amount of time.
+ */
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ lock.lock();
+ try {
+ long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+ long remainingTime;
+ while ((remainingTime = waitUntil - System.nanoTime()) > 0
+ && !executors.isEmpty()) {
+ terminating.awaitNanos(remainingTime);
+ }
+ if (remainingTime <= 0)
+ return false;
+ if (executors.isEmpty()) {
Review comment:
This statement should always be true when reached, as the return
statement above it ensures that it's only reachable if we exited the while
statement due to `executors.isEmpty()` returning true rather than due to
exceeding the timeout. Given that `executors` is only added to while holding
the lock, it shouldn't be possible for the value of `executors.isEmpty()` to
change after it evaluates to true in the while loop.
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+ @Before
+ public void initialize() {
+ TestRunnable.outOfSequence = false;
+ TestUnstripedRunnable.outOfSequence = false;
+ TestFastRunnable.outOfSequence = false;
+ }
+
+ @Test
+ public void testSingleStripeRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ assertThat(pool.isTerminated()).isFalse();
+ assertThat(pool.isShutdown()).isFalse();
+
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+ assertThat(TestRunnable.outOfSequence)
+ .as("Expected no out-of-sequence runnables to execute")
+ .isFalse();
+ assertThat(pool.isTerminated()).isTrue();
+ }
+
+ @Test
+ public void testShutdown() throws InterruptedException {
+ ThreadGroup group = new ThreadGroup("stripetestgroup");
+ Thread starter = new Thread(group, "starter") {
+ public void run() {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ pool.shutdown();
+ }
+ };
+ starter.start();
+ starter.join();
+
+ for (int i = 0; i < 100; i++) {
+ if (group.activeCount() == 0) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+
+ assertThat(group.activeCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testShutdownNow() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ Thread.sleep(500);
+
+ assertThat(pool.isTerminated()).isFalse();
+ Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+ assertThat(pool.isShutdown()).isTrue();
+ assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+ assertThat(pool.isTerminated()).isTrue();
+
+ assertThat(unfinishedJobs.size() > 0).isTrue();
+
+ assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+ }
+
+ @Test
+ public void testSingleStripeCallableWithCompletionService()
+ throws InterruptedException, ExecutionException {
+ ExecutorService pool = new StripedExecutorService();
+ final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+ pool);
+
+ Thread testSubmitter = new Thread("TestSubmitter") {
+ public void run() {
+ Object stripe = new Object();
+ for (int i = 0; i < 50; i++) {
+ cs.submit(new TestCallable(stripe, i));
+ }
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ interrupt();
+ }
+ for (int i = 50; i < 100; i++) {
+ cs.submit(new TestCallable(stripe, i));
+ }
+ }
+ };
+ testSubmitter.start();
+
+ for (int i = 0; i < 100; i++) {
+ int actual = cs.take().get().intValue();
Review comment:
The call to `intValue()` is unnecessary here and can be removed.
##########
File path:
geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+ @Before
+ public void initialize() {
+ TestRunnable.outOfSequence = false;
+ TestUnstripedRunnable.outOfSequence = false;
+ TestFastRunnable.outOfSequence = false;
+ }
+
+ @Test
+ public void testSingleStripeRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ assertThat(pool.isTerminated()).isFalse();
+ assertThat(pool.isShutdown()).isFalse();
+
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+ assertThat(TestRunnable.outOfSequence)
+ .as("Expected no out-of-sequence runnables to execute")
+ .isFalse();
+ assertThat(pool.isTerminated()).isTrue();
+ }
+
+ @Test
+ public void testShutdown() throws InterruptedException {
+ ThreadGroup group = new ThreadGroup("stripetestgroup");
+ Thread starter = new Thread(group, "starter") {
+ public void run() {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ pool.shutdown();
+ }
+ };
+ starter.start();
+ starter.join();
+
+ for (int i = 0; i < 100; i++) {
+ if (group.activeCount() == 0) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+
+ assertThat(group.activeCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testShutdownNow() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ Thread.sleep(500);
+
+ assertThat(pool.isTerminated()).isFalse();
+ Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+ assertThat(pool.isShutdown()).isTrue();
+ assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+ assertThat(pool.isTerminated()).isTrue();
+
+ assertThat(unfinishedJobs.size() > 0).isTrue();
+
+ assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+ }
+
+ @Test
+ public void testSingleStripeCallableWithCompletionService()
+ throws InterruptedException, ExecutionException {
+ ExecutorService pool = new StripedExecutorService();
+ final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+ pool);
+
+ Thread testSubmitter = new Thread("TestSubmitter") {
+ public void run() {
+ Object stripe = new Object();
+ for (int i = 0; i < 50; i++) {
+ cs.submit(new TestCallable(stripe, i));
+ }
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ interrupt();
+ }
+ for (int i = 50; i < 100; i++) {
+ cs.submit(new TestCallable(stripe, i));
+ }
+ }
+ };
+ testSubmitter.start();
+
+ for (int i = 0; i < 100; i++) {
+ int actual = cs.take().get().intValue();
+ System.out.println("Retrieved " + actual);
+ assertThat(actual).isEqualTo(i);
+ }
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+ testSubmitter.join();
+ }
+
+ @Test
+ public void testUnstripedRunnable() throws InterruptedException {
+ ExecutorService pool = new StripedExecutorService();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestUnstripedRunnable(actual, i));
+ }
+ pool.shutdown();
+ assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+ assertThat(TestUnstripedRunnable.outOfSequence)
+ .as("Expected at least some out-of-sequence runnables to execute")
+ .isTrue();
+ }
+
+ @Test
+ public void testMultipleStripes() throws InterruptedException {
+ final ExecutorService pool = new StripedExecutorService();
+ ExecutorService producerPool = Executors.newCachedThreadPool();
+ for (int i = 0; i < 20; i++) {
+ producerPool.submit(new Runnable() {
+ public void run() {
+ Object stripe = new Object();
+ AtomicInteger actual = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ pool.submit(new TestRunnable(stripe, actual, i));
+ }
+ }
+ });
+ }
+ producerPool.shutdown();
+
+ while (!producerPool.awaitTermination(1, TimeUnit.MINUTES)) {
+ ;
+ }
+
+ pool.shutdown();
+
+ assertThat(pool.awaitTermination(1, TimeUnit.DAYS)).isTrue();
+ assertThat(TestRunnable.outOfSequence)
+ .as("Expected no out-of-sequence runnables to execute")
+ .isFalse();
+ }
+
+
+ @Test
+ public void testMultipleFastStripes() throws InterruptedException {
Review comment:
The comments that apply to `testMultipleStripes()` also apply here.
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+ /**
+ * The wrapped ExecutorService that will actually execute our
+ * tasks.
+ */
+ private final ExecutorService executor;
+
+ /**
+ * The lock prevents shutdown from being called in the middle
+ * of a submit. It also guards the executors IdentityHashMap.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * This condition allows us to cleanly terminate this executor
+ * service.
+ */
+ private final Condition terminating = lock.newCondition();
+
+ /**
+ * Whenever a new StripedObject is submitted to the pool, it
+ * is added to this IdentityHashMap. As soon as the
+ * SerialExecutor is empty, the entry is removed from the map,
+ * in order to avoid a memory leak.
+ */
+ private final Map<Object, SerialExecutor> executors = new
IdentityHashMap<>();
+
+ /**
+ * The default submit() method creates a new FutureTask and
+ * wraps our StripedRunnable with it. We thus need to
+ * remember the stripe object somewhere. In our case, we will
+ * do this inside the ThreadLocal "stripes". Before the
+ * thread returns from submitting the runnable, it will always
+ * remove the thread local entry.
+ */
+ private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+ /**
+ * Valid states are RUNNING and SHUTDOWN. We rely on the
+ * underlying executor service for the remaining states.
+ */
+ private State state = State.RUNNING;
+
+ private enum State {
+ RUNNING, SHUTDOWN
+ }
+
+ /**
+ * Take care using this constructor. The original visibility was private
+ * since users should not shutdown their executors directly,
+ * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+ * passed in.
+ *
+ * @param executor the executor service that we use to execute
+ * the tasks
+ */
+ public StripedExecutorService(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a
+ * cached thread pool.
+ */
+ public StripedExecutorService() {
+ this(Executors.newCachedThreadPool());
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a fixed
+ * thread pool with the given number of threads.
+ */
+ public StripedExecutorService(int numberOfThreads) {
+ this(Executors.newFixedThreadPool(numberOfThreads));
+ }
+
+ /**
+ * If the runnable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual runnable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ saveStripedObject(runnable);
+ return super.newTaskFor(runnable, value);
+ }
+
+ /**
+ * If the callable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual callable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ saveStripedObject(callable);
+ return super.newTaskFor(callable);
+ }
+
+ /**
+ * Saves the stripe in a ThreadLocal until we can use it to
+ * schedule the task into our pool.
+ */
+ private void saveStripedObject(Object task) {
+ if (isStripedObject(task)) {
+ stripes.set(((StripedObject) task).getStripe());
+ }
+ }
+
+ /**
+ * Returns true if the object implements the StripedObject
+ * interface.
+ */
+ private static boolean isStripedObject(Object o) {
+ return o instanceof StripedObject;
+ }
+
+ /**
+ * Delegates the call to submit(task, null).
+ */
+ public Future<?> submit(Runnable task) {
+ return submit(task, null);
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Runnable task, T result) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task, result);
+ } else { // bypass the serial executors
+ return executor.submit(task, result);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Callable<T> task) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task);
+ } else { // bypass the serial executors
+ return executor.submit(task);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Throws a RejectedExecutionException if the state is not
+ * RUNNING.
+ */
+ private void checkPoolIsRunning() {
+ assert lock.isHeldByCurrentThread();
+ if (state != State.RUNNING) {
+ throw new RejectedExecutionException("executor not running");
+ }
+ }
+
+ /**
+ * Executes the command. If command implements StripedObject,
+ * we execute it with a SerialExecutor. This method can be
+ * called directly by clients or it may be called by the
+ * AbstractExecutorService's submit() methods. In that case,
+ * we check whether the stripes thread local has been set. If
+ * it is, we remove it and use it to determine the
+ * StripedObject and execute it with a SerialExecutor. If no
+ * StripedObject is set, we instead pass the command to the
+ * wrapped ExecutorService directly.
+ */
+ public void execute(Runnable command) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ Object stripe = getStripe(command);
+ if (stripe != null) {
+ SerialExecutor ser_exec = executors.get(stripe);
+ if (ser_exec == null) {
+ executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+ }
+ ser_exec.execute(command);
+ } else {
+ executor.execute(command);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * We get the stripe object either from the Runnable if it
+ * also implements StripedObject, or otherwise from the thread
+ * local temporary storage. Result may be null.
+ */
+ private Object getStripe(Runnable command) {
+ Object stripe;
+ if (command instanceof StripedObject) {
+ stripe = (((StripedObject) command).getStripe());
+ } else {
+ stripe = stripes.get();
+ }
+ stripes.remove();
+ return stripe;
+ }
+
+ /**
+ * Shuts down the StripedExecutorService. No more tasks will
+ * be submitted. If the map of SerialExecutors is empty, we
+ * shut down the wrapped executor.
+ */
+ public void shutdown() {
+ lock.lock();
+ try {
+ state = State.SHUTDOWN;
+ if (executors.isEmpty()) {
+ executor.shutdown();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * All the tasks in each of the SerialExecutors are drained
+ * to a list, as well as the tasks inside the wrapped
+ * ExecutorService. This is then returned to the user. Also,
+ * the shutdownNow method of the wrapped executor is called.
+ */
+ public List<Runnable> shutdownNow() {
+ lock.lock();
+ try {
+ shutdown();
+ List<Runnable> result = new ArrayList<>();
+ for (SerialExecutor ser_ex : executors.values()) {
+ ser_ex.tasks.drainTo(result);
+ }
+ result.addAll(executor.shutdownNow());
+ return result;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if shutdown() or shutdownNow() have been
+ * called; false otherwise.
+ */
+ public boolean isShutdown() {
+ lock.lock();
+ try {
+ return state == State.SHUTDOWN;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if this pool has been terminated, that is, all
+ * the SerialExecutors are empty and the wrapped
+ * ExecutorService has been terminated.
+ */
+ public boolean isTerminated() {
+ lock.lock();
+ try {
+ if (state == State.RUNNING)
+ return false;
+ for (SerialExecutor executor : executors.values()) {
+ if (!executor.isEmpty())
+ return false;
Review comment:
These if statements should have curly braces.
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+ /**
+ * The wrapped ExecutorService that will actually execute our
+ * tasks.
+ */
+ private final ExecutorService executor;
+
+ /**
+ * The lock prevents shutdown from being called in the middle
+ * of a submit. It also guards the executors IdentityHashMap.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * This condition allows us to cleanly terminate this executor
+ * service.
+ */
+ private final Condition terminating = lock.newCondition();
+
+ /**
+ * Whenever a new StripedObject is submitted to the pool, it
+ * is added to this IdentityHashMap. As soon as the
+ * SerialExecutor is empty, the entry is removed from the map,
+ * in order to avoid a memory leak.
+ */
+ private final Map<Object, SerialExecutor> executors = new
IdentityHashMap<>();
+
+ /**
+ * The default submit() method creates a new FutureTask and
+ * wraps our StripedRunnable with it. We thus need to
+ * remember the stripe object somewhere. In our case, we will
+ * do this inside the ThreadLocal "stripes". Before the
+ * thread returns from submitting the runnable, it will always
+ * remove the thread local entry.
+ */
+ private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+ /**
+ * Valid states are RUNNING and SHUTDOWN. We rely on the
+ * underlying executor service for the remaining states.
+ */
+ private State state = State.RUNNING;
+
+ private enum State {
+ RUNNING, SHUTDOWN
+ }
+
+ /**
+ * Take care using this constructor. The original visibility was private
+ * since users should not shutdown their executors directly,
+ * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+ * passed in.
+ *
+ * @param executor the executor service that we use to execute
+ * the tasks
+ */
+ public StripedExecutorService(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a
+ * cached thread pool.
+ */
+ public StripedExecutorService() {
+ this(Executors.newCachedThreadPool());
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a fixed
+ * thread pool with the given number of threads.
+ */
+ public StripedExecutorService(int numberOfThreads) {
+ this(Executors.newFixedThreadPool(numberOfThreads));
+ }
+
+ /**
+ * If the runnable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual runnable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ saveStripedObject(runnable);
+ return super.newTaskFor(runnable, value);
+ }
+
+ /**
+ * If the callable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual callable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ saveStripedObject(callable);
+ return super.newTaskFor(callable);
+ }
+
+ /**
+ * Saves the stripe in a ThreadLocal until we can use it to
+ * schedule the task into our pool.
+ */
+ private void saveStripedObject(Object task) {
+ if (isStripedObject(task)) {
+ stripes.set(((StripedObject) task).getStripe());
+ }
+ }
+
+ /**
+ * Returns true if the object implements the StripedObject
+ * interface.
+ */
+ private static boolean isStripedObject(Object o) {
+ return o instanceof StripedObject;
+ }
+
+ /**
+ * Delegates the call to submit(task, null).
+ */
+ public Future<?> submit(Runnable task) {
+ return submit(task, null);
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Runnable task, T result) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task, result);
+ } else { // bypass the serial executors
+ return executor.submit(task, result);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Callable<T> task) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task);
+ } else { // bypass the serial executors
+ return executor.submit(task);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Throws a RejectedExecutionException if the state is not
+ * RUNNING.
+ */
+ private void checkPoolIsRunning() {
+ assert lock.isHeldByCurrentThread();
+ if (state != State.RUNNING) {
+ throw new RejectedExecutionException("executor not running");
+ }
+ }
+
+ /**
+ * Executes the command. If command implements StripedObject,
+ * we execute it with a SerialExecutor. This method can be
+ * called directly by clients or it may be called by the
+ * AbstractExecutorService's submit() methods. In that case,
+ * we check whether the stripes thread local has been set. If
+ * it is, we remove it and use it to determine the
+ * StripedObject and execute it with a SerialExecutor. If no
+ * StripedObject is set, we instead pass the command to the
+ * wrapped ExecutorService directly.
+ */
+ public void execute(Runnable command) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ Object stripe = getStripe(command);
+ if (stripe != null) {
+ SerialExecutor ser_exec = executors.get(stripe);
+ if (ser_exec == null) {
+ executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+ }
+ ser_exec.execute(command);
+ } else {
+ executor.execute(command);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * We get the stripe object either from the Runnable if it
+ * also implements StripedObject, or otherwise from the thread
+ * local temporary storage. Result may be null.
+ */
+ private Object getStripe(Runnable command) {
+ Object stripe;
+ if (command instanceof StripedObject) {
+ stripe = (((StripedObject) command).getStripe());
+ } else {
+ stripe = stripes.get();
+ }
+ stripes.remove();
+ return stripe;
+ }
+
+ /**
+ * Shuts down the StripedExecutorService. No more tasks will
+ * be submitted. If the map of SerialExecutors is empty, we
+ * shut down the wrapped executor.
+ */
+ public void shutdown() {
+ lock.lock();
+ try {
+ state = State.SHUTDOWN;
+ if (executors.isEmpty()) {
+ executor.shutdown();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * All the tasks in each of the SerialExecutors are drained
+ * to a list, as well as the tasks inside the wrapped
+ * ExecutorService. This is then returned to the user. Also,
+ * the shutdownNow method of the wrapped executor is called.
+ */
+ public List<Runnable> shutdownNow() {
+ lock.lock();
+ try {
+ shutdown();
+ List<Runnable> result = new ArrayList<>();
+ for (SerialExecutor ser_ex : executors.values()) {
+ ser_ex.tasks.drainTo(result);
+ }
+ result.addAll(executor.shutdownNow());
+ return result;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if shutdown() or shutdownNow() have been
+ * called; false otherwise.
+ */
+ public boolean isShutdown() {
+ lock.lock();
+ try {
+ return state == State.SHUTDOWN;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if this pool has been terminated, that is, all
+ * the SerialExecutors are empty and the wrapped
+ * ExecutorService has been terminated.
+ */
+ public boolean isTerminated() {
+ lock.lock();
+ try {
+ if (state == State.RUNNING)
+ return false;
+ for (SerialExecutor executor : executors.values()) {
+ if (!executor.isEmpty())
+ return false;
+ }
+ return executor.isTerminated();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if the wrapped ExecutorService terminates
+ * within the allotted amount of time.
+ */
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ lock.lock();
+ try {
+ long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+ long remainingTime;
+ while ((remainingTime = waitUntil - System.nanoTime()) > 0
+ && !executors.isEmpty()) {
+ terminating.awaitNanos(remainingTime);
+ }
+ if (remainingTime <= 0)
+ return false;
+ if (executors.isEmpty()) {
+ return executor.awaitTermination(
+ remainingTime, TimeUnit.NANOSECONDS);
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * As soon as a SerialExecutor is empty, we remove it from the
+ * executors map. We might thus remove the SerialExecutors
+ * more quickly than necessary, but at least we can avoid a
+ * memory leak.
+ */
+ private void removeEmptySerialExecutor(Object stripe, SerialExecutor ser_ex)
{
+ assert ser_ex == executors.get(stripe);
+ assert lock.isHeldByCurrentThread();
+ assert ser_ex.isEmpty();
+
+ executors.remove(stripe);
+ terminating.signalAll();
+ if (state == State.SHUTDOWN && executors.isEmpty()) {
+ executor.shutdown();
+ }
+ }
+
+ /**
+ * Prints information about current state of this executor, the
+ * wrapped executor and the serial executors.
+ */
+ public String toString() {
+ lock.lock();
+ try {
+ return "StripedExecutorService: state=" + state + ", " +
+ "executor=" + executor + ", " +
+ "serialExecutors=" + executors;
+ } finally {
+ lock.unlock();
+ }
+
+ }
+
+ /**
+ * This field is used for conditional compilation. If it is
+ * false, then the finalize method is an empty method, in
+ * which case the SerialExecutor will not be registered with
+ * the Finalizer.
+ */
+ private static boolean DEBUG = false;
Review comment:
This can be `final`.
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+ /**
+ * The wrapped ExecutorService that will actually execute our
+ * tasks.
+ */
+ private final ExecutorService executor;
+
+ /**
+ * The lock prevents shutdown from being called in the middle
+ * of a submit. It also guards the executors IdentityHashMap.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * This condition allows us to cleanly terminate this executor
+ * service.
+ */
+ private final Condition terminating = lock.newCondition();
+
+ /**
+ * Whenever a new StripedObject is submitted to the pool, it
+ * is added to this IdentityHashMap. As soon as the
+ * SerialExecutor is empty, the entry is removed from the map,
+ * in order to avoid a memory leak.
+ */
+ private final Map<Object, SerialExecutor> executors = new
IdentityHashMap<>();
+
+ /**
+ * The default submit() method creates a new FutureTask and
+ * wraps our StripedRunnable with it. We thus need to
+ * remember the stripe object somewhere. In our case, we will
+ * do this inside the ThreadLocal "stripes". Before the
+ * thread returns from submitting the runnable, it will always
+ * remove the thread local entry.
+ */
+ private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+ /**
+ * Valid states are RUNNING and SHUTDOWN. We rely on the
+ * underlying executor service for the remaining states.
+ */
+ private State state = State.RUNNING;
+
+ private enum State {
+ RUNNING, SHUTDOWN
+ }
+
+ /**
+ * Take care using this constructor. The original visibility was private
+ * since users should not shutdown their executors directly,
+ * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+ * passed in.
+ *
+ * @param executor the executor service that we use to execute
+ * the tasks
+ */
+ public StripedExecutorService(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a
+ * cached thread pool.
+ */
+ public StripedExecutorService() {
+ this(Executors.newCachedThreadPool());
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a fixed
+ * thread pool with the given number of threads.
+ */
+ public StripedExecutorService(int numberOfThreads) {
+ this(Executors.newFixedThreadPool(numberOfThreads));
+ }
+
+ /**
+ * If the runnable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual runnable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ saveStripedObject(runnable);
+ return super.newTaskFor(runnable, value);
+ }
+
+ /**
+ * If the callable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual callable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ saveStripedObject(callable);
+ return super.newTaskFor(callable);
+ }
+
+ /**
+ * Saves the stripe in a ThreadLocal until we can use it to
+ * schedule the task into our pool.
+ */
+ private void saveStripedObject(Object task) {
+ if (isStripedObject(task)) {
+ stripes.set(((StripedObject) task).getStripe());
+ }
+ }
+
+ /**
+ * Returns true if the object implements the StripedObject
+ * interface.
+ */
+ private static boolean isStripedObject(Object o) {
+ return o instanceof StripedObject;
+ }
+
+ /**
+ * Delegates the call to submit(task, null).
+ */
+ public Future<?> submit(Runnable task) {
+ return submit(task, null);
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Runnable task, T result) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task, result);
+ } else { // bypass the serial executors
+ return executor.submit(task, result);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Callable<T> task) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task);
+ } else { // bypass the serial executors
+ return executor.submit(task);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Throws a RejectedExecutionException if the state is not
+ * RUNNING.
+ */
+ private void checkPoolIsRunning() {
+ assert lock.isHeldByCurrentThread();
+ if (state != State.RUNNING) {
+ throw new RejectedExecutionException("executor not running");
+ }
+ }
+
+ /**
+ * Executes the command. If command implements StripedObject,
+ * we execute it with a SerialExecutor. This method can be
+ * called directly by clients or it may be called by the
+ * AbstractExecutorService's submit() methods. In that case,
+ * we check whether the stripes thread local has been set. If
+ * it is, we remove it and use it to determine the
+ * StripedObject and execute it with a SerialExecutor. If no
+ * StripedObject is set, we instead pass the command to the
+ * wrapped ExecutorService directly.
+ */
+ public void execute(Runnable command) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ Object stripe = getStripe(command);
+ if (stripe != null) {
+ SerialExecutor ser_exec = executors.get(stripe);
+ if (ser_exec == null) {
+ executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+ }
+ ser_exec.execute(command);
+ } else {
+ executor.execute(command);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * We get the stripe object either from the Runnable if it
+ * also implements StripedObject, or otherwise from the thread
+ * local temporary storage. Result may be null.
+ */
+ private Object getStripe(Runnable command) {
+ Object stripe;
+ if (command instanceof StripedObject) {
+ stripe = (((StripedObject) command).getStripe());
+ } else {
+ stripe = stripes.get();
+ }
+ stripes.remove();
+ return stripe;
+ }
+
+ /**
+ * Shuts down the StripedExecutorService. No more tasks will
+ * be submitted. If the map of SerialExecutors is empty, we
+ * shut down the wrapped executor.
+ */
+ public void shutdown() {
+ lock.lock();
+ try {
+ state = State.SHUTDOWN;
+ if (executors.isEmpty()) {
+ executor.shutdown();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * All the tasks in each of the SerialExecutors are drained
+ * to a list, as well as the tasks inside the wrapped
+ * ExecutorService. This is then returned to the user. Also,
+ * the shutdownNow method of the wrapped executor is called.
+ */
+ public List<Runnable> shutdownNow() {
+ lock.lock();
+ try {
+ shutdown();
+ List<Runnable> result = new ArrayList<>();
+ for (SerialExecutor ser_ex : executors.values()) {
+ ser_ex.tasks.drainTo(result);
+ }
+ result.addAll(executor.shutdownNow());
+ return result;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if shutdown() or shutdownNow() have been
+ * called; false otherwise.
+ */
+ public boolean isShutdown() {
+ lock.lock();
+ try {
+ return state == State.SHUTDOWN;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if this pool has been terminated, that is, all
+ * the SerialExecutors are empty and the wrapped
+ * ExecutorService has been terminated.
+ */
+ public boolean isTerminated() {
+ lock.lock();
+ try {
+ if (state == State.RUNNING)
+ return false;
+ for (SerialExecutor executor : executors.values()) {
+ if (!executor.isEmpty())
+ return false;
+ }
+ return executor.isTerminated();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if the wrapped ExecutorService terminates
+ * within the allotted amount of time.
+ */
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ lock.lock();
+ try {
+ long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+ long remainingTime;
+ while ((remainingTime = waitUntil - System.nanoTime()) > 0
+ && !executors.isEmpty()) {
+ terminating.awaitNanos(remainingTime);
+ }
+ if (remainingTime <= 0)
+ return false;
Review comment:
This if statement should have curly braces.
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+ /**
+ * The wrapped ExecutorService that will actually execute our
+ * tasks.
+ */
+ private final ExecutorService executor;
+
+ /**
+ * The lock prevents shutdown from being called in the middle
+ * of a submit. It also guards the executors IdentityHashMap.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * This condition allows us to cleanly terminate this executor
+ * service.
+ */
+ private final Condition terminating = lock.newCondition();
+
+ /**
+ * Whenever a new StripedObject is submitted to the pool, it
+ * is added to this IdentityHashMap. As soon as the
+ * SerialExecutor is empty, the entry is removed from the map,
+ * in order to avoid a memory leak.
+ */
+ private final Map<Object, SerialExecutor> executors = new
IdentityHashMap<>();
+
+ /**
+ * The default submit() method creates a new FutureTask and
+ * wraps our StripedRunnable with it. We thus need to
+ * remember the stripe object somewhere. In our case, we will
+ * do this inside the ThreadLocal "stripes". Before the
+ * thread returns from submitting the runnable, it will always
+ * remove the thread local entry.
+ */
+ private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+ /**
+ * Valid states are RUNNING and SHUTDOWN. We rely on the
+ * underlying executor service for the remaining states.
+ */
+ private State state = State.RUNNING;
+
+ private enum State {
+ RUNNING, SHUTDOWN
+ }
+
+ /**
+ * Take care using this constructor. The original visibility was private
+ * since users should not shutdown their executors directly,
+ * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+ * passed in.
+ *
+ * @param executor the executor service that we use to execute
+ * the tasks
+ */
+ public StripedExecutorService(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a
+ * cached thread pool.
+ */
+ public StripedExecutorService() {
+ this(Executors.newCachedThreadPool());
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a fixed
+ * thread pool with the given number of threads.
+ */
+ public StripedExecutorService(int numberOfThreads) {
+ this(Executors.newFixedThreadPool(numberOfThreads));
+ }
+
+ /**
+ * If the runnable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual runnable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ saveStripedObject(runnable);
+ return super.newTaskFor(runnable, value);
+ }
+
+ /**
+ * If the callable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual callable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ saveStripedObject(callable);
+ return super.newTaskFor(callable);
+ }
+
+ /**
+ * Saves the stripe in a ThreadLocal until we can use it to
+ * schedule the task into our pool.
+ */
+ private void saveStripedObject(Object task) {
+ if (isStripedObject(task)) {
+ stripes.set(((StripedObject) task).getStripe());
+ }
+ }
+
+ /**
+ * Returns true if the object implements the StripedObject
+ * interface.
+ */
+ private static boolean isStripedObject(Object o) {
+ return o instanceof StripedObject;
+ }
+
+ /**
+ * Delegates the call to submit(task, null).
+ */
+ public Future<?> submit(Runnable task) {
+ return submit(task, null);
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Runnable task, T result) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task, result);
+ } else { // bypass the serial executors
+ return executor.submit(task, result);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Callable<T> task) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task);
+ } else { // bypass the serial executors
+ return executor.submit(task);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Throws a RejectedExecutionException if the state is not
+ * RUNNING.
+ */
+ private void checkPoolIsRunning() {
+ assert lock.isHeldByCurrentThread();
+ if (state != State.RUNNING) {
+ throw new RejectedExecutionException("executor not running");
+ }
+ }
+
+ /**
+ * Executes the command. If command implements StripedObject,
+ * we execute it with a SerialExecutor. This method can be
+ * called directly by clients or it may be called by the
+ * AbstractExecutorService's submit() methods. In that case,
+ * we check whether the stripes thread local has been set. If
+ * it is, we remove it and use it to determine the
+ * StripedObject and execute it with a SerialExecutor. If no
+ * StripedObject is set, we instead pass the command to the
+ * wrapped ExecutorService directly.
+ */
+ public void execute(Runnable command) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ Object stripe = getStripe(command);
+ if (stripe != null) {
+ SerialExecutor ser_exec = executors.get(stripe);
+ if (ser_exec == null) {
+ executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+ }
+ ser_exec.execute(command);
+ } else {
+ executor.execute(command);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * We get the stripe object either from the Runnable if it
+ * also implements StripedObject, or otherwise from the thread
+ * local temporary storage. Result may be null.
+ */
+ private Object getStripe(Runnable command) {
+ Object stripe;
+ if (command instanceof StripedObject) {
+ stripe = (((StripedObject) command).getStripe());
+ } else {
+ stripe = stripes.get();
+ }
+ stripes.remove();
+ return stripe;
+ }
+
+ /**
+ * Shuts down the StripedExecutorService. No more tasks will
+ * be submitted. If the map of SerialExecutors is empty, we
+ * shut down the wrapped executor.
+ */
+ public void shutdown() {
+ lock.lock();
+ try {
+ state = State.SHUTDOWN;
+ if (executors.isEmpty()) {
+ executor.shutdown();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * All the tasks in each of the SerialExecutors are drained
+ * to a list, as well as the tasks inside the wrapped
+ * ExecutorService. This is then returned to the user. Also,
+ * the shutdownNow method of the wrapped executor is called.
+ */
+ public List<Runnable> shutdownNow() {
+ lock.lock();
+ try {
+ shutdown();
+ List<Runnable> result = new ArrayList<>();
+ for (SerialExecutor ser_ex : executors.values()) {
+ ser_ex.tasks.drainTo(result);
+ }
+ result.addAll(executor.shutdownNow());
+ return result;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if shutdown() or shutdownNow() have been
+ * called; false otherwise.
+ */
+ public boolean isShutdown() {
+ lock.lock();
+ try {
+ return state == State.SHUTDOWN;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if this pool has been terminated, that is, all
+ * the SerialExecutors are empty and the wrapped
+ * ExecutorService has been terminated.
+ */
+ public boolean isTerminated() {
+ lock.lock();
+ try {
+ if (state == State.RUNNING)
+ return false;
+ for (SerialExecutor executor : executors.values()) {
+ if (!executor.isEmpty())
+ return false;
+ }
+ return executor.isTerminated();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if the wrapped ExecutorService terminates
+ * within the allotted amount of time.
+ */
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ lock.lock();
+ try {
+ long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+ long remainingTime;
+ while ((remainingTime = waitUntil - System.nanoTime()) > 0
+ && !executors.isEmpty()) {
+ terminating.awaitNanos(remainingTime);
+ }
+ if (remainingTime <= 0)
+ return false;
+ if (executors.isEmpty()) {
+ return executor.awaitTermination(
+ remainingTime, TimeUnit.NANOSECONDS);
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * As soon as a SerialExecutor is empty, we remove it from the
+ * executors map. We might thus remove the SerialExecutors
+ * more quickly than necessary, but at least we can avoid a
+ * memory leak.
+ */
+ private void removeEmptySerialExecutor(Object stripe, SerialExecutor ser_ex)
{
+ assert ser_ex == executors.get(stripe);
+ assert lock.isHeldByCurrentThread();
+ assert ser_ex.isEmpty();
+
+ executors.remove(stripe);
+ terminating.signalAll();
+ if (state == State.SHUTDOWN && executors.isEmpty()) {
+ executor.shutdown();
+ }
+ }
+
+ /**
+ * Prints information about current state of this executor, the
+ * wrapped executor and the serial executors.
+ */
+ public String toString() {
+ lock.lock();
+ try {
+ return "StripedExecutorService: state=" + state + ", " +
+ "executor=" + executor + ", " +
+ "serialExecutors=" + executors;
+ } finally {
+ lock.unlock();
+ }
+
+ }
+
+ /**
+ * This field is used for conditional compilation. If it is
+ * false, then the finalize method is an empty method, in
+ * which case the SerialExecutor will not be registered with
+ * the Finalizer.
+ */
+ private static boolean DEBUG = false;
+
+ /**
+ * SerialExecutor is based on the construct with the same name
+ * described in the {@link Executor} JavaDocs. The difference
+ * with our SerialExecutor is that it can be terminated. It
+ * also removes itself automatically once the queue is empty.
+ */
+ private class SerialExecutor implements Executor {
+ /**
+ * The queue of unexecuted tasks.
+ */
+ private final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
+ /**
+ * The runnable that we are currently busy with.
+ */
+ private Runnable active;
+ /**
+ * The stripe that this SerialExecutor was defined for. It
+ * is needed so that we can remove this executor from the
+ * map once it is empty.
+ */
+ private final Object stripe;
+
+ /**
+ * Creates a SerialExecutor for a particular stripe.
+ */
+ private SerialExecutor(Object stripe) {
+ this.stripe = stripe;
+ if (DEBUG) {
+ System.out.println("SerialExecutor created " + stripe);
Review comment:
Rather than the `DEBUG` boolean and a `System.out` statement, might it
be better to modify this to make use of logging?
##########
File path:
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+ /**
+ * The wrapped ExecutorService that will actually execute our
+ * tasks.
+ */
+ private final ExecutorService executor;
+
+ /**
+ * The lock prevents shutdown from being called in the middle
+ * of a submit. It also guards the executors IdentityHashMap.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * This condition allows us to cleanly terminate this executor
+ * service.
+ */
+ private final Condition terminating = lock.newCondition();
+
+ /**
+ * Whenever a new StripedObject is submitted to the pool, it
+ * is added to this IdentityHashMap. As soon as the
+ * SerialExecutor is empty, the entry is removed from the map,
+ * in order to avoid a memory leak.
+ */
+ private final Map<Object, SerialExecutor> executors = new
IdentityHashMap<>();
+
+ /**
+ * The default submit() method creates a new FutureTask and
+ * wraps our StripedRunnable with it. We thus need to
+ * remember the stripe object somewhere. In our case, we will
+ * do this inside the ThreadLocal "stripes". Before the
+ * thread returns from submitting the runnable, it will always
+ * remove the thread local entry.
+ */
+ private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+ /**
+ * Valid states are RUNNING and SHUTDOWN. We rely on the
+ * underlying executor service for the remaining states.
+ */
+ private State state = State.RUNNING;
+
+ private enum State {
+ RUNNING, SHUTDOWN
+ }
+
+ /**
+ * Take care using this constructor. The original visibility was private
+ * since users should not shutdown their executors directly,
+ * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+ * passed in.
+ *
+ * @param executor the executor service that we use to execute
+ * the tasks
+ */
+ public StripedExecutorService(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a
+ * cached thread pool.
+ */
+ public StripedExecutorService() {
+ this(Executors.newCachedThreadPool());
+ }
+
+ /**
+ * This constructs a StripedExecutorService that wraps a fixed
+ * thread pool with the given number of threads.
+ */
+ public StripedExecutorService(int numberOfThreads) {
+ this(Executors.newFixedThreadPool(numberOfThreads));
+ }
+
+ /**
+ * If the runnable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual runnable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ saveStripedObject(runnable);
+ return super.newTaskFor(runnable, value);
+ }
+
+ /**
+ * If the callable also implements StripedObject, we store the
+ * stripe object in a thread local, since the actual callable
+ * will be wrapped with a FutureTask.
+ */
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ saveStripedObject(callable);
+ return super.newTaskFor(callable);
+ }
+
+ /**
+ * Saves the stripe in a ThreadLocal until we can use it to
+ * schedule the task into our pool.
+ */
+ private void saveStripedObject(Object task) {
+ if (isStripedObject(task)) {
+ stripes.set(((StripedObject) task).getStripe());
+ }
+ }
+
+ /**
+ * Returns true if the object implements the StripedObject
+ * interface.
+ */
+ private static boolean isStripedObject(Object o) {
+ return o instanceof StripedObject;
+ }
+
+ /**
+ * Delegates the call to submit(task, null).
+ */
+ public Future<?> submit(Runnable task) {
+ return submit(task, null);
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Runnable task, T result) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task, result);
+ } else { // bypass the serial executors
+ return executor.submit(task, result);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * If the task is a StripedObject, we execute it in-order by
+ * its stripe, otherwise we submit it directly to the wrapped
+ * executor. If the pool is not running, we throw a
+ * RejectedExecutionException.
+ */
+ public <T> Future<T> submit(Callable<T> task) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ if (isStripedObject(task)) {
+ return super.submit(task);
+ } else { // bypass the serial executors
+ return executor.submit(task);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Throws a RejectedExecutionException if the state is not
+ * RUNNING.
+ */
+ private void checkPoolIsRunning() {
+ assert lock.isHeldByCurrentThread();
+ if (state != State.RUNNING) {
+ throw new RejectedExecutionException("executor not running");
+ }
+ }
+
+ /**
+ * Executes the command. If command implements StripedObject,
+ * we execute it with a SerialExecutor. This method can be
+ * called directly by clients or it may be called by the
+ * AbstractExecutorService's submit() methods. In that case,
+ * we check whether the stripes thread local has been set. If
+ * it is, we remove it and use it to determine the
+ * StripedObject and execute it with a SerialExecutor. If no
+ * StripedObject is set, we instead pass the command to the
+ * wrapped ExecutorService directly.
+ */
+ public void execute(Runnable command) {
+ lock.lock();
+ try {
+ checkPoolIsRunning();
+ Object stripe = getStripe(command);
+ if (stripe != null) {
+ SerialExecutor ser_exec = executors.get(stripe);
+ if (ser_exec == null) {
+ executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+ }
+ ser_exec.execute(command);
+ } else {
+ executor.execute(command);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * We get the stripe object either from the Runnable if it
+ * also implements StripedObject, or otherwise from the thread
+ * local temporary storage. Result may be null.
+ */
+ private Object getStripe(Runnable command) {
+ Object stripe;
+ if (command instanceof StripedObject) {
+ stripe = (((StripedObject) command).getStripe());
+ } else {
+ stripe = stripes.get();
+ }
+ stripes.remove();
+ return stripe;
+ }
+
+ /**
+ * Shuts down the StripedExecutorService. No more tasks will
+ * be submitted. If the map of SerialExecutors is empty, we
+ * shut down the wrapped executor.
+ */
+ public void shutdown() {
+ lock.lock();
+ try {
+ state = State.SHUTDOWN;
+ if (executors.isEmpty()) {
+ executor.shutdown();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * All the tasks in each of the SerialExecutors are drained
+ * to a list, as well as the tasks inside the wrapped
+ * ExecutorService. This is then returned to the user. Also,
+ * the shutdownNow method of the wrapped executor is called.
+ */
+ public List<Runnable> shutdownNow() {
+ lock.lock();
+ try {
+ shutdown();
+ List<Runnable> result = new ArrayList<>();
+ for (SerialExecutor ser_ex : executors.values()) {
+ ser_ex.tasks.drainTo(result);
+ }
+ result.addAll(executor.shutdownNow());
+ return result;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if shutdown() or shutdownNow() have been
+ * called; false otherwise.
+ */
+ public boolean isShutdown() {
+ lock.lock();
+ try {
+ return state == State.SHUTDOWN;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if this pool has been terminated, that is, all
+ * the SerialExecutors are empty and the wrapped
+ * ExecutorService has been terminated.
+ */
+ public boolean isTerminated() {
+ lock.lock();
+ try {
+ if (state == State.RUNNING)
+ return false;
+ for (SerialExecutor executor : executors.values()) {
+ if (!executor.isEmpty())
+ return false;
+ }
+ return executor.isTerminated();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if the wrapped ExecutorService terminates
+ * within the allotted amount of time.
+ */
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ lock.lock();
+ try {
+ long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+ long remainingTime;
+ while ((remainingTime = waitUntil - System.nanoTime()) > 0
+ && !executors.isEmpty()) {
+ terminating.awaitNanos(remainingTime);
+ }
+ if (remainingTime <= 0)
+ return false;
+ if (executors.isEmpty()) {
+ return executor.awaitTermination(
+ remainingTime, TimeUnit.NANOSECONDS);
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * As soon as a SerialExecutor is empty, we remove it from the
+ * executors map. We might thus remove the SerialExecutors
+ * more quickly than necessary, but at least we can avoid a
+ * memory leak.
+ */
+ private void removeEmptySerialExecutor(Object stripe, SerialExecutor ser_ex)
{
+ assert ser_ex == executors.get(stripe);
+ assert lock.isHeldByCurrentThread();
+ assert ser_ex.isEmpty();
Review comment:
Is it common for users to run Geode with assertions enabled? Since
they're not enabled by default, might it be safer to use a slightly different
approach here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]