ericpai commented on a change in pull request #5294: URL: https://github.com/apache/iotdb/pull/5294#discussion_r830827405
########## File path: server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IndexedBlockingQueue.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.schedule.queue; + +/** + * The base class of a special kind of blocking queue, which has these characters: + * + * <p>1. Thread-safe. + * + * <p>2. Can poll from queue head. When the queue is empty, the poll() will be blocked until an + * element is inserted. + * + * <p>3. Can push a non-null element to queue. When the queue is beyond the max size, an exception + * will be thrown. + * + * <p>4. Can remove an element by a type of {@link ID}. + * + * <p>5. Each element has the different ID. + */ +public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> { + + private final int MAX_CAPACITY; + private final E queryHolder; + private int size; + + /** + * Init the queue with a max capacity. The queryHolder is just a simple reused object in query to + * avoid small objects allocation. It should be not used in any other places out of the queue as + * the id may be mutated. + * + * @param maxCapacity the max capacity of the queue. + * @param queryHolder the query holder instance. + * @throws IllegalArgumentException if maxCapacity <= 0. + */ + public IndexedBlockingQueue(int maxCapacity, E queryHolder) { + this.MAX_CAPACITY = maxCapacity; + this.queryHolder = queryHolder; + } + + /** + * Get and remove the first element of the queue. If the queue is empty, this call will be blocked + * until an element has been pushed. + * + * @return the queue head element. + */ + public synchronized E poll() throws InterruptedException { + while (isEmpty()) { + this.wait(); + } + E output = pollFirst(); + size--; + return output; + } + + /** + * Push an element to the queue. The new element position is determined by the implementation. If + * the queue size has been reached the maxCapacity, an {@link IllegalStateException} will be + * thrown. If the element is null, an {@link NullPointerException} will be thrown. + * + * @param element the element to be pushed. + * @throws NullPointerException the pushed element is null. + * @throws IllegalStateException the queue size has been reached the maxCapacity. + */ + public synchronized void push(E element) { + if (element == null) { + throw new NullPointerException("pushed element is null"); + } + int sizeDelta = contains(element) ? 0 : 1; + if (size + sizeDelta > MAX_CAPACITY) { + throw new IllegalStateException("the queue is full"); + } + pushToQueue(element); + size += sizeDelta; + this.notifyAll(); + } + + /** + * Remove and return the element by id. It returns null if it doesn't exist. + * + * @param id the id of the element to be removed. + * @return the removed element. + */ + public synchronized E remove(ID id) { + queryHolder.setId(id); Review comment: The comments has been added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
