[FLINK-3384] [kafka] Add ClosableQueue for message exchanges between Kafka Threads
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd324ea7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd324ea7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd324ea7 Branch: refs/heads/master Commit: fd324ea72979cc3d4202ffa3ea174ec4cc9d153b Parents: 50bd65a Author: Stephan Ewen <se...@apache.org> Authored: Wed Feb 10 14:51:10 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 10 22:15:32 2016 +0100 ---------------------------------------------------------------------- .../kafka/internals/ClosableBlockingQueue.java | 502 +++++++++++++++ .../internals/ClosableBlockingQueueTest.java | 603 +++++++++++++++++++ 2 files changed, 1105 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fd324ea7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java new file mode 100644 index 0000000..856c2ad --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static java.util.Objects.requireNonNull; + +/** + * A special form of blocking queue with two additions: + * <ol> + * <li>The queue can be closed atomically when empty. Adding elements after the queue + * is closed fails. This allows queue consumers to atomically discover that no elements + * are available and mark themselves as shut down.</li> + * <li>The queue allows to poll batches of elements in one polling call.</li> + * </ol> + * + * The queue has no capacity restriction and is safe for multiple producers and consumers. + * + * <p>Note: Null elements are prohibited. + * + * @param <E> The type of elements in the queue. + */ +public class ClosableBlockingQueue<E> { + + /** The lock used to make queue accesses and open checks atomic */ + private final ReentrantLock lock; + + /** The condition on which blocking get-calls wait if the queue is empty */ + private final Condition nonEmpty; + + /** The deque of elements */ + private final ArrayDeque<E> elements; + + /** Flag marking the status of the queue */ + private volatile boolean open; + + // ------------------------------------------------------------------------ + + /** + * Creates a new empty queue. + */ + public ClosableBlockingQueue() { + this(10); + } + + /** + * Creates a new empty queue, reserving space for at least the specified number + * of elements. The queu can still grow, of more elements are added than the + * reserved space. + * + * @param initialSize The number of elements to reserve space for. + */ + public ClosableBlockingQueue(int initialSize) { + this.lock = new ReentrantLock(true); + this.nonEmpty = this.lock.newCondition(); + + this.elements = new ArrayDeque<>(initialSize); + this.open = true; + + + } + + /** + * Creates a new queue that contains the given elements. + * + * @param initialElements The elements to initially add to the queue. + */ + public ClosableBlockingQueue(Collection<? extends E> initialElements) { + this(initialElements.size()); + this.elements.addAll(initialElements); + } + + // ------------------------------------------------------------------------ + // Size and status + // ------------------------------------------------------------------------ + + /** + * Gets the number of elements currently in the queue. + * @return The number of elements currently in the queue. + */ + public int size() { + return elements.size(); + } + + /** + * Checks whether the queue is empty (has no elements). + * @return True, if the queue is empty; false, if it is non-empty. + */ + public boolean isEmpty() { + return size() == 0; + } + + /** + * Checks whether the queue is currently open, meaning elements can be added and polled. + * @return True, if the queue is open; false, if it is closed. + */ + public boolean isOpen() { + return open; + } + + /** + * Tries to close the queue. Closing the queue only succeeds when no elements are + * in the queue when this method is called. Checking whether the queue is empty, and + * marking the queue as closed is one atomic operation. + * + * @return True, if the queue is closed, false if the queue remains open. + */ + public boolean close() { + lock.lock(); + try { + if (open) { + if (elements.isEmpty()) { + open = false; + nonEmpty.signalAll(); + return true; + } else { + return false; + } + } + else { + // already closed + return true; + } + } finally { + lock.unlock(); + } + } + + // ------------------------------------------------------------------------ + // Adding / Removing elements + // ------------------------------------------------------------------------ + + /** + * Tries to add an element to the queue, if the queue is still open. Checking whether the queue + * is open and adding the element is one atomic operation. + * + * <p>Unlike the {@link #add(Object)} method, this method never throws an exception, + * but only indicates via the return code if the element was added or the + * queue was closed. + * + * @param element The element to add. + * @return True, if the element was added, false if the queue was closes. + */ + public boolean addIfOpen(E element) { + requireNonNull(element); + + lock.lock(); + try { + if (open) { + elements.addLast(element); + if (elements.size() == 1) { + nonEmpty.signalAll(); + } + } + return open; + } finally { + lock.unlock(); + } + } + + /** + * Adds the element to the queue, or fails with an exception, if the queue is closed. + * Checking whether the queue is open and adding the element is one atomic operation. + * + * @param element The element to add. + * @throws IllegalStateException Thrown, if the queue is closed. + */ + public void add(E element) throws IllegalStateException { + requireNonNull(element); + + lock.lock(); + try { + if (open) { + elements.addLast(element); + if (elements.size() == 1) { + nonEmpty.signalAll(); + } + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Returns the queue's next element without removing it, if the queue is non-empty. + * Otherwise, returns null. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and getting the next element is one atomic operation. + * + * <p>This method never blocks. + * + * @return The queue's next element, or null, if the queue is empty. + * @throws IllegalStateException Thrown, if the queue is closed. + */ + public E peek() { + lock.lock(); + try { + if (open) { + if (elements.size() > 0) { + return elements.getFirst(); + } else { + return null; + } + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Returns the queue's next element and removes it, the queue is non-empty. + * Otherwise, this method returns null. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the next element is one atomic operation. + * + * <p>This method never blocks. + * + * @return The queue's next element, or null, if the queue is empty. + * @throws IllegalStateException Thrown, if the queue is closed. + */ + public E poll() { + lock.lock(); + try { + if (open) { + if (elements.size() > 0) { + return elements.removeFirst(); + } else { + return null; + } + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Returns all of the queue's current elements in a list, if the queue is non-empty. + * Otherwise, this method returns null. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the elements is one atomic operation. + * + * <p>This method never blocks. + * + * @return All of the queue's elements, or null, if the queue is empty. + * @throws IllegalStateException Thrown, if the queue is closed. + */ + public List<E> pollBatch() { + lock.lock(); + try { + if (open) { + if (elements.size() > 0) { + ArrayList<E> result = new ArrayList<>(elements); + elements.clear(); + return result; + } else { + return null; + } + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Returns the next element in the queue. If the queue is empty, this method + * waits until at least one element is added. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the next element is one atomic operation. + * + * @return The next element in the queue, never null. + * + * @throws IllegalStateException Thrown, if the queue is closed. + * @throws InterruptedException Throw, if the thread is interrupted while waiting for an + * element to be added. + */ + public E getElementBlocking() throws InterruptedException { + lock.lock(); + try { + while (open && elements.isEmpty()) { + nonEmpty.await(); + } + + if (open) { + return elements.removeFirst(); + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Returns the next element in the queue. If the queue is empty, this method + * waits at most a certain time until an element becomes available. If no element + * is available after that time, the method returns null. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the next element is one atomic operation. + * + * @param timeoutMillis The number of milliseconds to block, at most. + * @return The next element in the queue, or null, if the timeout expires before an element is available. + * + * @throws IllegalStateException Thrown, if the queue is closed. + * @throws InterruptedException Throw, if the thread is interrupted while waiting for an + * element to be added. + */ + public E getElementBlocking(long timeoutMillis) throws InterruptedException { + if (timeoutMillis == 0L) { + // wait forever case + return getElementBlocking(); + } else if (timeoutMillis < 0L) { + throw new IllegalArgumentException("invalid timeout"); + } + + final long deadline = System.currentTimeMillis() + timeoutMillis; + + lock.lock(); + try { + while (open && elements.isEmpty() && timeoutMillis > 0) { + nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS); + timeoutMillis = deadline - System.currentTimeMillis(); + } + + if (!open) { + throw new IllegalStateException("queue is closed"); + } + else if (elements.isEmpty()) { + return null; + } else { + return elements.removeFirst(); + } + } finally { + lock.unlock(); + } + } + + /** + * Gets all the elements found in the list, or blocks until at least one element + * was added. If the queue is empty when this method is called, it blocks until + * at least one element is added. + * + * <p>This method always returns a list with at least one element. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the next element is one atomic operation. + * + * @return A list with all elements in the queue, always at least one element. + * + * @throws IllegalStateException Thrown, if the queue is closed. + * @throws InterruptedException Throw, if the thread is interrupted while waiting for an + * element to be added. + */ + public List<E> getBatchBlocking() throws InterruptedException { + lock.lock(); + try { + while (open && elements.isEmpty()) { + nonEmpty.await(); + } + if (open) { + ArrayList<E> result = new ArrayList<>(elements); + elements.clear(); + return result; + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Gets all the elements found in the list, or blocks until at least one element + * was added. This method is similar as {@link #getBatchBlocking()}, but takes + * a number of milliseconds that the method will maximally wait before returning. + * + * <p>This method never returns null, but an empty list, if the queue is empty when + * the method is called and the request times out before an element was added. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the next element is one atomic operation. + * + * @param timeoutMillis The number of milliseconds to wait, at most. + * @return A list with all elements in the queue, possible an empty list. + * + * @throws IllegalStateException Thrown, if the queue is closed. + * @throws InterruptedException Throw, if the thread is interrupted while waiting for an + * element to be added. + */ + public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException { + if (timeoutMillis == 0L) { + // wait forever case + return getBatchBlocking(); + } else if (timeoutMillis < 0L) { + throw new IllegalArgumentException("invalid timeout"); + } + + final long deadline = System.currentTimeMillis() + timeoutMillis; + + lock.lock(); + try { + while (open && elements.isEmpty() && timeoutMillis > 0) { + nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS); + timeoutMillis = deadline - System.currentTimeMillis(); + } + + if (!open) { + throw new IllegalStateException("queue is closed"); + } + else if (elements.isEmpty()) { + return Collections.emptyList(); + } + else { + ArrayList<E> result = new ArrayList<>(elements); + elements.clear(); + return result; + } + } finally { + lock.unlock(); + } + } + + // ------------------------------------------------------------------------ + // Standard Utilities + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + int hashCode = 17; + for (E element : elements) { + hashCode = 31 * hashCode + element.hashCode(); + } + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) { + @SuppressWarnings("unchecked") + ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj; + + if (this.elements.size() == that.elements.size()) { + Iterator<E> thisElements = this.elements.iterator(); + for (E thatNext : that.elements) { + E thisNext = thisElements.next(); + if (!(thisNext == null ? thatNext == null : thisNext.equals(thatNext))) { + return false; + } + } + return true; + } else { + return false; + } + } else { + return false; + } + } + + @Override + public String toString() { + return elements.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fd324ea7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java new file mode 100644 index 0000000..6298c92 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java @@ -0,0 +1,603 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; + +public class ClosableBlockingQueueTest { + + // ------------------------------------------------------------------------ + // single-threaded unit tests + // ------------------------------------------------------------------------ + + @Test + public void testCreateQueueHashCodeEquals() { + try { + ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>(); + ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>(22); + + assertTrue(queue1.isOpen()); + assertTrue(queue2.isOpen()); + assertTrue(queue1.isEmpty()); + assertTrue(queue2.isEmpty()); + assertEquals(0, queue1.size()); + assertEquals(0, queue2.size()); + + assertTrue(queue1.hashCode() == queue2.hashCode()); + //noinspection EqualsWithItself + assertTrue(queue1.equals(queue1)); + //noinspection EqualsWithItself + assertTrue(queue2.equals(queue2)); + assertTrue(queue1.equals(queue2)); + + assertNotNull(queue1.toString()); + assertNotNull(queue2.toString()); + + List<String> elements = new ArrayList<>(); + elements.add("a"); + elements.add("b"); + elements.add("c"); + + ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>(elements); + ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>(asList("a", "b", "c")); + + assertTrue(queue3.isOpen()); + assertTrue(queue4.isOpen()); + assertFalse(queue3.isEmpty()); + assertFalse(queue4.isEmpty()); + assertEquals(3, queue3.size()); + assertEquals(3, queue4.size()); + + assertTrue(queue3.hashCode() == queue4.hashCode()); + //noinspection EqualsWithItself + assertTrue(queue3.equals(queue3)); + //noinspection EqualsWithItself + assertTrue(queue4.equals(queue4)); + assertTrue(queue3.equals(queue4)); + + assertNotNull(queue3.toString()); + assertNotNull(queue4.toString()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCloseEmptyQueue() { + try { + ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); + assertTrue(queue.isOpen()); + assertTrue(queue.close()); + assertFalse(queue.isOpen()); + + assertFalse(queue.addIfOpen("element")); + assertTrue(queue.isEmpty()); + + try { + queue.add("some element"); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCloseNonEmptyQueue() { + try { + ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>(asList(1, 2, 3)); + assertTrue(queue.isOpen()); + + assertFalse(queue.close()); + assertFalse(queue.close()); + + queue.poll(); + + assertFalse(queue.close()); + assertFalse(queue.close()); + + queue.pollBatch(); + + assertTrue(queue.close()); + assertFalse(queue.isOpen()); + + assertFalse(queue.addIfOpen(42)); + assertTrue(queue.isEmpty()); + + try { + queue.add(99); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPeekAndPoll() { + try { + ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); + + assertNull(queue.peek()); + assertNull(queue.peek()); + assertNull(queue.poll()); + assertNull(queue.poll()); + + assertEquals(0, queue.size()); + + queue.add("a"); + queue.add("b"); + queue.add("c"); + + assertEquals(3, queue.size()); + + assertEquals("a", queue.peek()); + assertEquals("a", queue.peek()); + assertEquals("a", queue.peek()); + + assertEquals(3, queue.size()); + + assertEquals("a", queue.poll()); + assertEquals("b", queue.poll()); + + assertEquals(1, queue.size()); + + assertEquals("c", queue.peek()); + assertEquals("c", queue.peek()); + + assertEquals("c", queue.poll()); + + assertEquals(0, queue.size()); + assertNull(queue.poll()); + assertNull(queue.peek()); + assertNull(queue.peek()); + + assertTrue(queue.close()); + + try { + queue.peek(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + + try { + queue.poll(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPollBatch() { + try { + ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); + + assertNull(queue.pollBatch()); + + queue.add("a"); + queue.add("b"); + + assertEquals(asList("a", "b"), queue.pollBatch()); + assertNull(queue.pollBatch()); + + queue.add("c"); + + assertEquals(singletonList("c"), queue.pollBatch()); + assertNull(queue.pollBatch()); + + assertTrue(queue.close()); + + try { + queue.pollBatch(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testGetElementBlocking() { + try { + ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); + + assertNull(queue.getElementBlocking(1)); + assertNull(queue.getElementBlocking(3)); + assertNull(queue.getElementBlocking(2)); + + assertEquals(0, queue.size()); + + queue.add("a"); + queue.add("b"); + queue.add("c"); + queue.add("d"); + queue.add("e"); + queue.add("f"); + + assertEquals(6, queue.size()); + + assertEquals("a", queue.getElementBlocking(99)); + assertEquals("b", queue.getElementBlocking()); + + assertEquals(4, queue.size()); + + assertEquals("c", queue.getElementBlocking(0)); + assertEquals("d", queue.getElementBlocking(1000000)); + assertEquals("e", queue.getElementBlocking()); + assertEquals("f", queue.getElementBlocking(1786598)); + + assertEquals(0, queue.size()); + + assertNull(queue.getElementBlocking(1)); + assertNull(queue.getElementBlocking(3)); + assertNull(queue.getElementBlocking(2)); + + assertTrue(queue.close()); + + try { + queue.getElementBlocking(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + + try { + queue.getElementBlocking(1000000000L); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testGetBatchBlocking() { + try { + ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); + + assertEquals(emptyList(), queue.getBatchBlocking(1)); + assertEquals(emptyList(), queue.getBatchBlocking(3)); + assertEquals(emptyList(), queue.getBatchBlocking(2)); + + queue.add("a"); + queue.add("b"); + + assertEquals(asList("a", "b"), queue.getBatchBlocking(900000009)); + + queue.add("c"); + queue.add("d"); + + assertEquals(asList("c", "d"), queue.getBatchBlocking()); + + assertEquals(emptyList(), queue.getBatchBlocking(2)); + + queue.add("e"); + + assertEquals(singletonList("e"), queue.getBatchBlocking(0)); + + queue.add("f"); + + assertEquals(singletonList("f"), queue.getBatchBlocking(1000000000)); + + assertEquals(0, queue.size()); + + assertEquals(emptyList(), queue.getBatchBlocking(1)); + assertEquals(emptyList(), queue.getBatchBlocking(3)); + assertEquals(emptyList(), queue.getBatchBlocking(2)); + + assertTrue(queue.close()); + + try { + queue.getBatchBlocking(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + + try { + queue.getBatchBlocking(1000000000L); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + // multi-threaded tests + // ------------------------------------------------------------------------ + + @Test + public void notifyOnClose() { + try { + final long oneYear = 365L * 24 * 60 * 60 * 1000; + + // test "getBatchBlocking()" + final ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>(); + QueueCall call1 = new QueueCall() { + @Override + public void call() throws Exception { + queue1.getBatchBlocking(); + } + }; + testCallExitsOnClose(call1, queue1); + + // test "getBatchBlocking()" + final ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>(); + QueueCall call2 = new QueueCall() { + @Override + public void call() throws Exception { + queue2.getBatchBlocking(oneYear); + } + }; + testCallExitsOnClose(call2, queue2); + + // test "getBatchBlocking()" + final ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>(); + QueueCall call3 = new QueueCall() { + @Override + public void call() throws Exception { + queue3.getElementBlocking(); + } + }; + testCallExitsOnClose(call3, queue3); + + // test "getBatchBlocking()" + final ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>(); + QueueCall call4 = new QueueCall() { + @Override + public void call() throws Exception { + queue4.getElementBlocking(oneYear); + } + }; + testCallExitsOnClose(call4, queue4); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + @Test + public void testMultiThreadedAddGet() { + try { + final ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>(); + final AtomicReference<Throwable> pushErrorRef = new AtomicReference<>(); + final AtomicReference<Throwable> pollErrorRef = new AtomicReference<>(); + + final int numElements = 2000; + + Thread pusher = new Thread("pusher") { + + @Override + public void run() { + try { + final Random rnd = new Random(); + for (int i = 0; i < numElements; i++) { + queue.add(i); + + // sleep a bit, sometimes + int sleepTime = rnd.nextInt(3); + if (sleepTime > 1) { + Thread.sleep(sleepTime); + } + } + + while (true) { + if (queue.close()) { + break; + } else { + Thread.sleep(5); + } + } + } catch (Throwable t) { + pushErrorRef.set(t); + } + } + }; + pusher.start(); + + Thread poller = new Thread("poller") { + + @SuppressWarnings("InfiniteLoopStatement") + @Override + public void run() { + try { + int count = 0; + + try { + final Random rnd = new Random(); + int nextExpected = 0; + + while (true) { + int getMethod = count % 7; + switch (getMethod) { + case 0: { + Integer next = queue.getElementBlocking(1); + if (next != null) { + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + break; + } + case 1: { + List<Integer> nextList = queue.getBatchBlocking(); + for (Integer next : nextList) { + assertNotNull(next); + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + break; + } + case 2: { + List<Integer> nextList = queue.getBatchBlocking(1); + if (nextList != null) { + for (Integer next : nextList) { + assertNotNull(next); + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + } + break; + } + case 3: { + Integer next = queue.poll(); + if (next != null) { + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + break; + } + case 4: { + List<Integer> nextList = queue.pollBatch(); + if (nextList != null) { + for (Integer next : nextList) { + assertNotNull(next); + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + } + break; + } + default: { + Integer next = queue.getElementBlocking(); + assertNotNull(next); + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + } + + // sleep a bit, sometimes + int sleepTime = rnd.nextInt(3); + if (sleepTime > 1) { + Thread.sleep(sleepTime); + } + } + } catch (IllegalStateException e) { + // we get this once the queue is closed + assertEquals(numElements, count); + } + } catch (Throwable t) { + pollErrorRef.set(t); + } + } + }; + poller.start(); + + pusher.join(); + poller.join(); + + if (pushErrorRef.get() != null) { + Throwable t = pushErrorRef.get(); + t.printStackTrace(); + fail("Error in pusher: " + t.getMessage()); + } + if (pollErrorRef.get() != null) { + Throwable t = pollErrorRef.get(); + t.printStackTrace(); + fail("Error in poller: " + t.getMessage()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + // Utils + // ------------------------------------------------------------------------ + + private static void testCallExitsOnClose( + final QueueCall call, ClosableBlockingQueue<String> queue) throws Exception { + + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + call.call(); + } catch (Throwable t) { + errorRef.set(t); + } + } + }; + + Thread thread = new Thread(runnable); + thread.start(); + Thread.sleep(100); + queue.close(); + thread.join(); + + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + Throwable cause = errorRef.get(); + assertTrue(cause instanceof IllegalStateException); + } + + private interface QueueCall { + void call() throws Exception; + } +}