>From Hussain Towaileb <[email protected]>: Hussain Towaileb has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18711 )
Change subject: Add capability to calculate collection size ...................................................................... Add capability to calculate collection size Change-Id: I02fb8ff0a87ff272aba05a676ccfb46ac370ebd2 --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeResponseMessage.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeResponseMessage.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeRequestMessage.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeRequestMessage.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java 7 files changed, 345 insertions(+), 8 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/11/18711/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeRequestMessage.java new file mode 100644 index 0000000..1baff0f --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeRequestMessage.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.common.api.IClusterManagementWork; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.messaging.CCMessageBroker; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CalculateCollectionSizeRequestMessage implements ICcAddressedMessage { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long serialVersionUID = 1L; + private final String nodeId; + private final long reqId; + private final String fqn; + + public CalculateCollectionSizeRequestMessage(String nodeId, long reqId, String fqn) { + this.nodeId = nodeId; + this.reqId = reqId; + this.fqn = fqn; + } + + @Override + public void handle(ICcApplicationContext appCtx) { + try { + long size = 0; + IClusterStateManager csm = appCtx.getClusterStateManager(); + if (csm.getState() == IClusterManagementWork.ClusterState.ACTIVE + || csm.getState() == IClusterManagementWork.ClusterState.REBALANCE_REQUIRED) { + final List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes()); + CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + + long reqId = messageBroker.newRequestId(); + List<CollectionSizeRequestMessage> requests = new ArrayList<>(); + for (int i = 0; i < ncs.size(); i++) { + requests.add(new CollectionSizeRequestMessage(reqId, fqn)); + } + size = (long) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, TimeUnit.SECONDS.toMillis(60), + true); + + CalculateCollectionSizeResponseMessage response = + new CalculateCollectionSizeResponseMessage(this.reqId, size); + messageBroker.sendApplicationMessageToNC(response, nodeId); + } + } catch (Exception ex) { + // TODO(htowaileb): proper error message + throw new IllegalStateException(ex); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeResponseMessage.java new file mode 100644 index 0000000..eaf5975 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeResponseMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class CalculateCollectionSizeResponseMessage implements INcAddressedMessage { + + private static final long serialVersionUID = 1L; + private final long reqId; + private final long size; + + public CalculateCollectionSizeResponseMessage(long reqId, long size) { + this.reqId = reqId; + this.size = size; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + MessageFuture future = mb.deregisterMessageFuture(reqId); + if (future != null) { + future.complete(this); + } + } + + public long getSize() { + return size; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeRequestMessage.java new file mode 100644 index 0000000..6b9cb4a --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeRequestMessage.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.CcIdentifiedMessage; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CollectionSizeRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long serialVersionUID = 1L; + private final long reqId; + private final String fqn; + + public CollectionSizeRequestMessage(long reqId, String fqn) { + this.reqId = reqId; + this.fqn = fqn; + } + + @Override + public void handle(INcApplicationContext appCtx) { + try { + IIOManager ioManager = appCtx.getPersistenceIoManager(); + CollectionSizeResponseMessage response = + new CollectionSizeResponseMessage(reqId, ioManager.getCollectionSize(fqn), null); + respond(appCtx, response); + } catch (Exception e) { + // TODO(htowaileb): proper error message + LOGGER.error("failed to get collection size", e); + CollectionSizeResponseMessage response = new CollectionSizeResponseMessage(reqId, 0, e); + respond(appCtx, response); + } + } + + private void respond(INcApplicationContext appCtx, CollectionSizeResponseMessage response) { + NCMessageBroker messageBroker = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + try { + messageBroker.sendMessageToPrimaryCC(response); + } catch (Exception e) { + // TODO(htowaileb): proper error message + LOGGER.error("failed to send collection size to cc", e); + } + } + + @Override + public boolean isWhispered() { + return true; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeResponseMessage.java new file mode 100644 index 0000000..023a7a6 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeResponseMessage.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.messaging.api.ICCMessageBroker.ResponseState; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.INcResponse; +import org.apache.commons.lang3.tuple.MutablePair; + +public class CollectionSizeResponseMessage implements ICcAddressedMessage, INcResponse { + + private static final long serialVersionUID = 1L; + private final long reqId; + private final long collectionSize; + private final Throwable failure; + + public CollectionSizeResponseMessage(long reqId, long collectionSize, Throwable failure) { + this.reqId = reqId; + this.collectionSize = collectionSize; + this.failure = failure; + } + + @Override + public void handle(ICcApplicationContext appCtx) { + ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + broker.respond(reqId, this); + } + + @Override + public void setResult(MutablePair<ResponseState, Object> result) { + if (failure != null) { + result.setLeft(ResponseState.FAILURE); + result.setRight(failure); + return; + } + setResponse(result); + } + + private void setResponse(MutablePair<ResponseState, Object> result) { + switch (result.getKey()) { + case SUCCESS: + long currentSize = (long) result.getValue(); + result.setValue(currentSize + collectionSize); + break; + case UNINITIALIZED: + result.setLeft(ResponseState.SUCCESS); + result.setValue(collectionSize); + break; + default: + break; + } + } + + @Override + public boolean isWhispered() { + return true; + } +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java index c005253..a006dec 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.Predicate; import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation; import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack; @@ -68,6 +69,8 @@ public abstract class AbstractCloudIOManager extends IOManager implements IPartitionBootstrapper, ICloudIOManager { private static final Logger LOGGER = LogManager.getLogger(); private static final byte[] EMPTY_FILE_BYTES = "empty".getBytes(); + private static final Predicate<String> NO_OP_LIST_FILES_FILTER = (path) -> true; + protected final ICloudClient cloudClient; protected final ICloudGuardian guardian; protected final IWriteBufferProvider writeBufferProvider; @@ -376,7 +379,7 @@ public final JsonNode listAsJson(ObjectMapper objectMapper) { ArrayNode objectsInfo = objectMapper.createArrayNode(); try { - List<CloudFile> allFiles = list(); + List<CloudFile> allFiles = list(NO_OP_LIST_FILES_FILTER); allFiles.sort((x, y) -> String.CASE_INSENSITIVE_ORDER.compare(x.getPath(), y.getPath())); for (CloudFile file : allFiles) { ObjectNode objectInfo = objectsInfo.addObject(); @@ -393,7 +396,7 @@ } } - private List<CloudFile> list() { + private List<CloudFile> list(Predicate<String> filter) { List<CloudFile> allFiles = new ArrayList<>(); // get cached files (read from disk) for (IODeviceHandle deviceHandle : getIODevices()) { @@ -409,7 +412,9 @@ for (FileReference fileReference : deviceFiles) { try { - allFiles.add(CloudFile.of(fileReference.getRelativePath(), fileReference.getFile().length())); + if (filter.test(fileReference.getRelativePath())) { + allFiles.add(CloudFile.of(fileReference.getRelativePath(), fileReference.getFile().length())); + } } catch (Throwable th) { LOGGER.warn("Encountered issue for local storage file {}", fileReference.getRelativePath(), th); } @@ -418,7 +423,9 @@ // get uncached files from uncached files tracker for (UncachedFileReference uncachedFile : getUncachedFiles()) { - allFiles.add(CloudFile.of(uncachedFile.getRelativePath(), uncachedFile.getSize())); + if (filter.test(uncachedFile.getRelativePath())) { + allFiles.add(CloudFile.of(uncachedFile.getRelativePath(), uncachedFile.getSize())); + } } return allFiles; } @@ -468,10 +475,46 @@ } public long getTotalRemoteStorageSizeForNodeBytes() { - long size = 0; - for (CloudFile file : list()) { - size += file.getSize(); + return getResourceTotalSize(NO_OP_LIST_FILES_FILTER); + } + + @Override + public long getCollectionSize(String fqn) { + return getResourceTotalSize(path -> path.contains(fqn)); + } + + private long getResourceTotalSize(Predicate<String> relativePathFilter) { + long totalSize = 0; + + // get cached files (read from disk) + for (IODeviceHandle deviceHandle : getIODevices()) { + FileReference storageRoot = deviceHandle.createFileRef(STORAGE_ROOT_DIR_NAME); + + Set<FileReference> deviceFiles; + try { + deviceFiles = localIoManager.list(storageRoot, IoUtil.NO_OP_FILTER); + } catch (Throwable th) { + LOGGER.warn("Failed to get local storage files for root {}", storageRoot.getRelativePath(), th); + continue; + } + + for (FileReference fileReference : deviceFiles) { + try { + if (relativePathFilter.test(fileReference.getRelativePath())) { + totalSize += fileReference.getFile().length(); + } + } catch (Throwable th) { + LOGGER.warn("Encountered issue for local storage file {}", fileReference.getRelativePath(), th); + } + } } - return size; + + // get uncached files from uncached files tracker + for (UncachedFileReference uncachedFile : getUncachedFiles()) { + if (relativePathFilter.test(uncachedFile.getRelativePath())) { + totalSize += uncachedFile.getSize(); + } + } + return totalSize; } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java index a6520c6..beca30d 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java @@ -156,6 +156,14 @@ long getSize(FileReference fileReference) throws HyracksDataException; /** + * Gets the size of the provided collection + * + * @param fqn fully qualified name of the collection + * @return resource size + */ + long getCollectionSize(String fqn) throws HyracksDataException; + + /** * Returns a new write channel * * @param fileHandle handle of the opened file diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java index b35111e..aac66ea 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java @@ -601,6 +601,13 @@ ((AbstractBulkOperation) bulkOperation).performOperation(); } + @Override + public long getCollectionSize(String fqn) { + // TODO(htowaileb): this should have similar implementation to AbstractCloudIOManager#getCollectionSize + // with minor modification, but return 0 for now. + return 0; + } + public void setSpaceMaker(IDiskSpaceMaker spaceMaker) { this.spaceMaker = spaceMaker; } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18711 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I02fb8ff0a87ff272aba05a676ccfb46ac370ebd2 Gerrit-Change-Number: 18711 Gerrit-PatchSet: 1 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-MessageType: newchange
