Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1770

Change subject: ASTERIXDB-1915: make dataset files uniformed distributed among 
io devices.
......................................................................

ASTERIXDB-1915: make dataset files uniformed distributed among io devices.

Change-Id: I2dd9e17e96c1d4ef55e29d0a0f8feadf8ce321ed
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IFileDeviceComputer.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/DefaultDeviceComputer.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
M 
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
M 
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
10 files changed, 109 insertions(+), 24 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/70/1770/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 26952ad..a34fa18 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -53,6 +53,7 @@
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.io.IFileDeviceComputer;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -281,6 +282,14 @@
         return runtimeContext;
     }
 
+    @Override
+    public IFileDeviceComputer getFileDeviceComputer() {
+        return (relPath, devices) -> {
+            int ioDeviceIndex = 
Math.abs(StoragePathUtil.getPartitionNumFromRelativePath(relPath) % 
devices.size());
+            return devices.get(ioDeviceIndex);
+        };
+    }
+
     protected IHyracksClientConnection getHcc() throws Exception {
         NodeControllerService ncSrv = (NodeControllerService) 
ncServiceCtx.getControllerService();
         ClusterControllerInfo ccInfo = 
ncSrv.getNodeParameters().getClusterControllerInfo();
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 7c2e472..a4c50a7 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -123,11 +123,11 @@
                     librarian.cleanup();
                 }
                 testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, 
ExecutionTestUtil.FailedGroup);
+
                 try {
+                    checkStorageFile();
+                } finally {
                     testExecutor.cleanup(tcCtx.toString(), badTestCases);
-                } catch (Throwable th) {
-                    th.printStackTrace();
-                    throw th;
                 }
             }
         } finally {
@@ -135,6 +135,65 @@
         }
     }
 
+    private static void checkStorageFile() throws Exception {
+        String tempDirPath = System.getProperty("java.io.tmpdir");
+        File dir = new File(tempDirPath);
+        File[] subDirs = dir.listFiles();
+        List<File> ncStores = new ArrayList<>();
+
+        // Finds nc stores.
+        for (File file : subDirs) {
+            if (file.getName().startsWith("asterix_nc")) {
+                ncStores.add(file);
+            }
+        }
+
+        // Checks that dataset files are uniformed distributed across each nc 
store.
+        for (File ncStore : ncStores) {
+            checkNcStore(ncStore);
+        }
+    }
+
+    private static void checkNcStore(File ncStore) throws Exception {
+        File[] ioDevices = ncStore.listFiles();
+        int expectedPartitionNum = -1;
+        for (File ioDevice : ioDevices) {
+            File[] dataDirs = ioDevice.listFiles();
+            for (File dataDir : dataDirs) {
+                String dirName = dataDir.getName();
+                if (!dirName.equals("storage")) {
+                    continue;
+                }
+                int numPartitions = 
getNumResidentPartitions(dataDir.listFiles());
+                if (expectedPartitionNum < 0) {
+                    expectedPartitionNum = numPartitions;
+                } else {
+                    if (expectedPartitionNum != numPartitions) {
+                        throw new Exception("Non-uniform data distribution on 
io devices: " + dataDir.getAbsolutePath()
+                                + " number of partitions: " + numPartitions + 
" expected number of partitions: "
+                                + expectedPartitionNum);
+                    }
+                }
+                break;
+            }
+        }
+    }
+
+    private static int getNumResidentPartitions(File[] partitions) {
+        int num = 0;
+        for (File partition : partitions) {
+            File[] dataverses = partition.listFiles();
+            for (File dv : dataverses) {
+                String dvName = dv.getName();
+                if (!dvName.equals("Metadata")) {
+                    num++;
+                    break;
+                }
+            }
+        }
+        return num;
+    }
+
     private static void checkThreadLeaks() throws IOException {
         String threadDump = 
ThreadDumpHelper.takeDumpJSON(ManagementFactory.getThreadMXBean());
         // Currently we only do sanity check for threads used in the execution 
engine.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index 5b35095..775c24e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.api.application;
 
+import org.apache.hyracks.api.io.IFileDeviceComputer;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 
 public interface INCApplication extends IApplication {
@@ -25,4 +26,11 @@
     void preStop() throws Exception; //NOSONAR
 
     NodeCapacity getCapacity();
+
+    /**
+     * @return the file device computer which resolves the relative path of a 
storage
+     *         file into an io device.
+     */
+    IFileDeviceComputer getFileDeviceComputer();
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IFileDeviceComputer.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IFileDeviceComputer.java
index e75efd4..932e1b9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IFileDeviceComputer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IFileDeviceComputer.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.api.io;
 
+import java.util.List;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
@@ -30,8 +32,8 @@
      * Compute the device from the relative path
      *
      * @param relativePath
-     * @return
+     * @return the resident IO device of the file.
      */
-    IODeviceHandle compute(String relativePath) throws HyracksDataException;
+    IODeviceHandle compute(String relativePath, List<IODeviceHandle> devices) 
throws HyracksDataException;
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index baf69d0..8a8c8c1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.io.IFileDeviceComputer;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.ControllerConfig;
@@ -79,6 +80,11 @@
         return null;
     }
 
+    @Override
+    public IFileDeviceComputer getFileDeviceComputer() {
+        return null;
+    }
+
     protected void configureLoggingLevel(Level level) {
         Logger.getLogger("org.apache.hyracks").setLevel(level);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index be24dbe..db2c887 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -172,7 +172,8 @@
         this.application = application;
         id = ncConfig.getNodeId();
 
-        ioManager = new 
IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()));
+        ioManager = new 
IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()),
+                application.getFileDeviceComputer());
         if (id == null) {
             throw new HyracksException("id not set");
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/DefaultDeviceComputer.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/DefaultDeviceComputer.java
index bacb608..fb3c5f6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/DefaultDeviceComputer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/DefaultDeviceComputer.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.nc.io;
 
 import java.io.File;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -26,24 +27,20 @@
 import org.apache.hyracks.api.io.IODeviceHandle;
 
 public class DefaultDeviceComputer implements IFileDeviceComputer {
-    private final IOManager ioManager;
     private AtomicInteger next = new AtomicInteger(0);
 
-    public DefaultDeviceComputer(IOManager ioManager) {
-        this.ioManager = ioManager;
-    }
-
     @Override
-    public IODeviceHandle compute(String relPath) throws HyracksDataException {
+    public IODeviceHandle compute(String relPath, List<IODeviceHandle> 
devices) throws HyracksDataException {
+        int numDevices = devices.size();
         String path = relPath;
         // if number of devices is 1, we return the device
-        if (ioManager.getIODevices().size() == 1) {
-            return ioManager.getIODevices().get(0);
+        if (numDevices == 1) {
+            return devices.get(0);
         }
         // check if it exists already on a device
         int nextSeparator = path.lastIndexOf(File.separator);
         while (nextSeparator > 0) {
-            for (IODeviceHandle dev : ioManager.getIODevices()) {
+            for (IODeviceHandle dev : devices) {
                 if (dev.contains(path)) {
                     return dev;
                 }
@@ -52,13 +49,13 @@
             nextSeparator = path.lastIndexOf(File.separator);
         }
         // one last attempt
-        for (IODeviceHandle dev : ioManager.getIODevices()) {
+        for (IODeviceHandle dev : devices) {
             if (dev.contains(path)) {
                 return dev;
             }
         }
         // not on any device, round robin assignment
-        return ioManager.getIODevices().get(next.getAndIncrement() % 
ioManager.getIODevices().size());
+        return devices.get(next.getAndIncrement() % numDevices);
     }
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index d97a7b5..1eb830a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -57,12 +57,13 @@
     private int workspaceIndex;
     private IFileDeviceComputer deviceComputer;
 
-    public IOManager(List<IODeviceHandle> devices, Executor executor) throws 
HyracksDataException {
-        this(devices);
+    public IOManager(List<IODeviceHandle> devices, Executor executor, 
IFileDeviceComputer deviceComputer)
+            throws HyracksDataException {
+        this(devices, deviceComputer);
         this.executor = executor;
     }
 
-    public IOManager(List<IODeviceHandle> devices) throws HyracksDataException 
{
+    public IOManager(List<IODeviceHandle> devices, IFileDeviceComputer 
deviceComputer) throws HyracksDataException {
         this.ioDevices = Collections.unmodifiableList(devices);
         checkDeviceValidity(devices);
         workspaces = new ArrayList<>();
@@ -76,7 +77,7 @@
             throw new HyracksDataException("No devices with workspace found");
         }
         workspaceIndex = 0;
-        deviceComputer = new DefaultDeviceComputer(this);
+        this.deviceComputer = deviceComputer;
     }
 
     private void checkDeviceValidity(List<IODeviceHandle> devices) throws 
HyracksDataException {
@@ -356,7 +357,7 @@
 
     @Override
     public FileReference resolve(String path) throws HyracksDataException {
-        return new FileReference(deviceComputer.compute(path), path);
+        return new FileReference(deviceComputer.compute(path, getIODevices()), 
path);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 04efad3..2d968ba 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.control.nc.io.DefaultDeviceComputer;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
@@ -109,7 +110,7 @@
             List<IODeviceHandle> devices = new ArrayList<>();
             devices.add(new IODeviceHandle(new 
File(System.getProperty("user.dir") + File.separator + "target"),
                     "iodev_test_wa"));
-            ioManager = new IOManager(devices, 
Executors.newCachedThreadPool());
+            ioManager = new IOManager(devices, 
Executors.newCachedThreadPool(), new DefaultDeviceComputer());
         }
         return ioManager;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index 88bdb1a..7220dfc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.nc.io.DefaultDeviceComputer;
 import org.apache.hyracks.control.nc.io.IOManager;
 
 public class TestUtils {
@@ -58,7 +59,7 @@
     private static IOManager createIoManager() throws HyracksException {
         List<IODeviceHandle> devices = new ArrayList<>();
         devices.add(new IODeviceHandle(new 
File(System.getProperty("java.io.tmpdir")), "."));
-        return new IOManager(devices, Executors.newCachedThreadPool());
+        return new IOManager(devices, Executors.newCachedThreadPool(), new 
DefaultDeviceComputer());
     }
 
     public static void compareWithResult(File expectedFile, File actualFile) 
throws Exception {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1770
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2dd9e17e96c1d4ef55e29d0a0f8feadf8ce321ed
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to