xingtanzjr commented on a change in pull request #5294:
URL: https://github.com/apache/iotdb/pull/5294#discussion_r830818027



##########
File path: 
server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IndexedBlockingQueue.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.schedule.queue;
+
+/**
+ * The base class of a special kind of blocking queue, which has these 
characters:
+ *
+ * <p>1. Thread-safe.
+ *
+ * <p>2. Can poll from queue head. When the queue is empty, the poll() will be 
blocked until an
+ * element is inserted.
+ *
+ * <p>3. Can push a non-null element to queue. When the queue is beyond the 
max size, an exception
+ * will be thrown.
+ *
+ * <p>4. Can remove an element by a type of {@link ID}.
+ *
+ * <p>5. Each element has the different ID.
+ */
+public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
+
+  private final int MAX_CAPACITY;
+  private final E queryHolder;
+  private int size;
+
+  /**
+   * Init the queue with a max capacity. The queryHolder is just a simple 
reused object in query to
+   * avoid small objects allocation. It should be not used in any other places 
out of the queue as
+   * the id may be mutated.
+   *
+   * @param maxCapacity the max capacity of the queue.
+   * @param queryHolder the query holder instance.
+   * @throws IllegalArgumentException if maxCapacity <= 0.
+   */
+  public IndexedBlockingQueue(int maxCapacity, E queryHolder) {
+    this.MAX_CAPACITY = maxCapacity;
+    this.queryHolder = queryHolder;
+  }
+
+  /**
+   * Get and remove the first element of the queue. If the queue is empty, 
this call will be blocked
+   * until an element has been pushed.
+   *
+   * @return the queue head element.
+   */
+  public synchronized E poll() throws InterruptedException {
+    while (isEmpty()) {
+      this.wait();
+    }
+    E output = pollFirst();
+    size--;
+    return output;
+  }
+
+  /**
+   * Push an element to the queue. The new element position is determined by 
the implementation. If
+   * the queue size has been reached the maxCapacity, an {@link 
IllegalStateException} will be
+   * thrown. If the element is null, an {@link NullPointerException} will be 
thrown.
+   *
+   * @param element the element to be pushed.
+   * @throws NullPointerException the pushed element is null.
+   * @throws IllegalStateException the queue size has been reached the 
maxCapacity.
+   */
+  public synchronized void push(E element) {
+    if (element == null) {
+      throw new NullPointerException("pushed element is null");
+    }
+    int sizeDelta = contains(element) ? 0 : 1;
+    if (size + sizeDelta > MAX_CAPACITY) {
+      throw new IllegalStateException("the queue is full");
+    }
+    pushToQueue(element);
+    size += sizeDelta;
+    this.notifyAll();
+  }
+
+  /**
+   * Remove and return the element by id. It returns null if it doesn't exist.
+   *
+   * @param id the id of the element to be removed.
+   * @return the removed element.
+   */
+  public synchronized E remove(ID id) {
+    queryHolder.setId(id);

Review comment:
       When using this Queue, we need to ensure the `equals()` and `hashCode()` 
are only related to `E`'s Id. Or it will lead to some unexpected behaviors.
   
   I am not sure whether we need to add some constraints or not... Just a 
kindly thought... 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to