>From Wail Alkowaileet <[email protected]>:
Wail Alkowaileet has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17749 )
Change subject: [ASTERIXDB-3253][STO] Fix deleting uncached files
......................................................................
[ASTERIXDB-3253][STO] Fix deleting uncached files
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
When deleting uncached files, we should account
(correctly) what files were deleted from the
cloud storage and were not cached locally.
Change-Id: Ie72121593acc6f53f9cfb87e54c17577e8633df7
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
10 files changed, 134 insertions(+), 11 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/49/17749/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 0611688..6973b7b 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -31,6 +31,7 @@
import java.util.Set;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
+import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
import org.apache.asterix.cloud.clients.CloudClientProvider;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.util.CloudFileUtil;
@@ -199,7 +200,7 @@
@Override
public IIOBulkOperation createDeleteBulkOperation() {
- return new DeleteBulkCloudOperation(localIoManager, bucket,
cloudClient);
+ return new DeleteBulkCloudOperation(localIoManager, bucket,
cloudClient, NoOpDeleteBulkCallBack.INSTANCE);
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 94fd5f8..01f684b 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -25,6 +25,7 @@
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
import org.apache.asterix.cloud.lazy.accessor.ILazyAccessor;
import org.apache.asterix.cloud.lazy.accessor.ILazyAccessorReplacer;
import org.apache.asterix.cloud.lazy.accessor.InitialCloudAccessor;
@@ -34,6 +35,7 @@
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
@@ -112,6 +114,12 @@
* IIOManager functions
* ******************************************************************
*/
+
+ @Override
+ public IIOBulkOperation createDeleteBulkOperation() {
+ return new DeleteBulkCloudOperation(localIoManager, bucket,
cloudClient, accessor.getBulkOperationCallBack());
+ }
+
@Override
public Set<FileReference> list(FileReference dir, FilenameFilter filter)
throws HyracksDataException {
return accessor.doList(dir, filter);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index f36d594..3135624 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -24,7 +24,7 @@
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -33,26 +33,30 @@
private static final Logger LOGGER = LogManager.getLogger();
private final String bucket;
private final ICloudClient cloudClient;
+ private final IBulkOperationCallBack callBack;
- public DeleteBulkCloudOperation(IIOManager ioManager, String bucket,
ICloudClient cloudClient) {
+ public DeleteBulkCloudOperation(IOManager ioManager, String bucket,
ICloudClient cloudClient,
+ IBulkOperationCallBack callBack) {
super(ioManager);
this.bucket = bucket;
this.cloudClient = cloudClient;
+ this.callBack = callBack;
}
@Override
- public void performOperation() throws HyracksDataException {
+ public int performOperation() throws HyracksDataException {
/*
* TODO What about deleting multiple directories?
* Actually, is there a case where we delete multiple directories
from the cloud?
*/
List<String> paths =
fileReferences.stream().map(FileReference::getRelativePath).collect(Collectors.toList());
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Bulk deleting: {}", paths);
+ LOGGER.debug("Bulk deleting: local: {}, cloud: {}",
fileReferences, paths);
}
cloudClient.deleteObjects(bucket, paths);
-
// Bulk delete locally as well
- super.performOperation();
+ int localDeletes = super.performOperation();
+ callBack.call(localDeletes, paths);
+ return paths.size();
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
new file mode 100644
index 0000000..14a0c4e
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.bulk;
+
+import java.util.Collection;
+
+public interface IBulkOperationCallBack {
+ void call(int numberOfAffectedLocalFiles, Collection<String> paths);
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
new file mode 100644
index 0000000..c877be2
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.bulk;
+
+import java.util.Collection;
+
+public class NoOpDeleteBulkCallBack implements IBulkOperationCallBack {
+ public static final IBulkOperationCallBack INSTANCE = new
NoOpDeleteBulkCallBack();
+
+ private NoOpDeleteBulkCallBack() {
+ }
+
+ @Override
+ public void call(int numberOfAffectedLocalFiles, Collection<String> paths)
{
+ // NoOp
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
index efedd64..8f803a0 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
@@ -29,6 +30,8 @@
public interface ILazyAccessor {
boolean isLocalAccessor();
+ IBulkOperationCallBack getBulkOperationCallBack();
+
void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode
rwMode, IIOManager.FileSyncMode syncMode)
throws HyracksDataException;
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
index 5715f43..378cf03 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
@@ -22,6 +22,8 @@
import java.util.Set;
import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
+import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -43,6 +45,11 @@
}
@Override
+ public IBulkOperationCallBack getBulkOperationCallBack() {
+ return NoOpDeleteBulkCallBack.INSTANCE;
+ }
+
+ @Override
public void doOnOpen(CloudFileHandle fileHandle,
IIOManager.FileReadWriteMode rwMode,
IIOManager.FileSyncMode syncMode) throws HyracksDataException {
// NoOp
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index f30bf82..19873e8 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -23,9 +23,11 @@
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.asterix.cloud.CloudFileHandle;
import org.apache.asterix.cloud.WriteBufferProvider;
+import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.util.CloudFileUtil;
import org.apache.asterix.common.utils.StoragePathUtil;
@@ -46,6 +48,7 @@
private final AtomicInteger numberOfUncachedFiles;
private final WriteBufferProvider writeBufferProvider;
private final ILazyAccessorReplacer replacer;
+ private final IBulkOperationCallBack callBack;
public ReplaceableCloudAccessor(ICloudClient cloudClient, String bucket,
IOManager localIoManager,
Set<Integer> partitions, int numberOfUncachedFiles,
WriteBufferProvider writeBufferProvider,
@@ -55,6 +58,10 @@
this.numberOfUncachedFiles = new AtomicInteger(numberOfUncachedFiles);
this.writeBufferProvider = writeBufferProvider;
this.replacer = replacer;
+ this.callBack = (numberOfAffectedLocalFiles, paths) -> {
+ int totalUncached = paths.size() - numberOfAffectedLocalFiles;
+
replaceAccessor(this.numberOfUncachedFiles.addAndGet(-totalUncached));
+ };
}
@Override
@@ -63,6 +70,11 @@
}
@Override
+ public IBulkOperationCallBack getBulkOperationCallBack() {
+ return callBack;
+ }
+
+ @Override
public void doOnOpen(CloudFileHandle fileHandle,
IIOManager.FileReadWriteMode rwMode,
IIOManager.FileSyncMode syncMode) throws HyracksDataException {
FileReference fileRef = fileHandle.getFileReference();
@@ -148,8 +160,17 @@
// Never delete the storage dir in cloud storage
int numberOfCloudDeletes = doCloudDelete(fileReference);
// check local
- if (numberOfCloudDeletes > 0 && localIoManager.exists(fileReference)) {
- int numberOfLocalDeletes = fileReference.getFile().isFile() ? 1 :
localIoManager.list(fileReference).size();
+ if (numberOfCloudDeletes > 0) {
+ int numberOfLocalDeletes;
+ if (numberOfCloudDeletes == 1) {
+ // file delete
+ numberOfLocalDeletes = localIoManager.exists(fileReference) ?
1 : 0;
+ } else {
+ // directory delete
+ Set<String> localToBeDeleted =
localIoManager.list(fileReference).stream()
+
.map(FileReference::getRelativePath).collect(Collectors.toSet());
+ numberOfLocalDeletes = localToBeDeleted.size();
+ }
// Decrement by number of cloud deletes that have no counterparts
locally
decrementNumberOfUncachedFiles(numberOfCloudDeletes -
numberOfLocalDeletes);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
index 4ca7b9c..26c2789 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
@@ -40,5 +40,5 @@
fileReferences.add(fileReference);
}
- public abstract void performOperation() throws HyracksDataException;
+ public abstract int performOperation() throws HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
index 5ccfdd6..f1911de 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
@@ -28,9 +28,12 @@
}
@Override
- public void performOperation() throws HyracksDataException {
+ public int performOperation() throws HyracksDataException {
+ int count = 0;
for (FileReference fileReference : fileReferences) {
+ count += ioManager.exists(fileReference) ? 1 : 0;
ioManager.delete(fileReference);
}
+ return count;
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17749
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ie72121593acc6f53f9cfb87e54c17577e8633df7
Gerrit-Change-Number: 17749
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange