Repository: drill
Updated Branches:
  refs/heads/master ef0fafea2 -> 9e944c97e


DRILL-5089: Dynamically load schema of storage plugin only when needed for 
every query

For each query, loading all storage plugins and loading all workspaces under 
file system plugins is not needed.

This patch use DynamicRootSchema as the root schema for Drill. Which loads 
correspondent storage only when needed.

infoschema to read full schema information and load second level schema 
accordingly.

for workspaces under the same Filesyetm, no need to create FileSystem for each 
workspace.

use fs.access API to check permission which is available after HDFS 2.6 except 
for windows + local file system case.

Add unit tests to test with a broken mock storage: with a storage that will 
throw Exception in regiterSchema method,
all queries even on good storages shall fail without this fix(Drill still load 
all schemas from all storages).

(cherry picked from commit a66d1d7)


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/18a71a38
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/18a71a38
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/18a71a38

Branch: refs/heads/master
Commit: 18a71a38f6bd1fd33d21d1c68fc23c5901b0080a
Parents: 3f0e517
Author: chunhui-shi <c...@maprtech.com>
Authored: Fri Nov 3 02:06:25 2017 +0200
Committer: Volodymyr Vysotskyi <vvo...@gmail.com>
Committed: Tue Jan 16 12:10:13 2018 +0200

----------------------------------------------------------------------
 .../apache/calcite/jdbc/DynamicRootSchema.java  | 129 +++++++++++++++++++
 .../org/apache/calcite/jdbc/DynamicSchema.java  |  57 ++++++++
 .../apache/drill/exec/ops/FragmentContext.java  |   9 +-
 .../org/apache/drill/exec/ops/QueryContext.java |  11 +-
 .../drill/exec/planner/sql/SqlConverter.java    |  10 +-
 .../drill/exec/store/SchemaTreeProvider.java    |  31 ++++-
 .../exec/store/StoragePluginRegistryImpl.java   |   2 +
 .../exec/store/dfs/FileSystemSchemaFactory.java |  23 +++-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |  87 ++++++++-----
 .../store/ischema/InfoSchemaBatchCreator.java   |   2 +-
 .../exec/store/mock/MockBreakageStorage.java    |  47 +++++++
 .../exec/store/mock/MockStorageEngine.java      |   7 +-
 .../exec/work/metadata/MetadataProvider.java    |   2 +-
 .../drill/exec/physical/impl/TestSchema.java    |  87 +++++++++++++
 .../drill/test/ClusterFixtureBuilder.java       |   4 +
 .../drill/test/ClusterMockStorageFixture.java   |  51 ++++++++
 16 files changed, 512 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java 
b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
new file mode 100644
index 0000000..cde46f2
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.jdbc;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.DataContext;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.SubSchemaWrapper;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This class is to allow us loading schemas from storage plugins later when 
{@link #getSubSchema(String, boolean)}
+ * is called.
+ */
+public class DynamicRootSchema extends DynamicSchema {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DynamicRootSchema.class);
+
+  protected SchemaConfig schemaConfig;
+  protected StoragePluginRegistry storages;
+
+  public StoragePluginRegistry getSchemaFactories() {
+    return storages;
+  }
+
+  /** Creates a root schema. */
+  DynamicRootSchema(StoragePluginRegistry storages, SchemaConfig schemaConfig) 
{
+    super(null, new RootSchema(), "");
+    this.schemaConfig = schemaConfig;
+    this.storages = storages;
+  }
+
+  @Override
+  protected CalciteSchema getImplicitSubSchema(String schemaName,
+                                               boolean caseSensitive) {
+    CalciteSchema retSchema = getSubSchemaMap().get(schemaName);
+    if (retSchema != null) {
+      return retSchema;
+    }
+
+    loadSchemaFactory(schemaName, caseSensitive);
+    retSchema = getSubSchemaMap().get(schemaName);
+    return retSchema;
+  }
+
+  /**
+   * load schema factory(storage plugin) for schemaName
+   * @param schemaName
+   * @param caseSensitive
+   */
+  public void loadSchemaFactory(String schemaName, boolean caseSensitive) {
+    try {
+      SchemaPlus thisPlus = this.plus();
+      StoragePlugin plugin = getSchemaFactories().getPlugin(schemaName);
+      if (plugin != null) {
+        plugin.registerSchemas(schemaConfig, thisPlus);
+        return;
+      }
+
+      // Could not find the plugin of schemaName. The schemaName could be 
`dfs.tmp`, a 2nd level schema under 'dfs'
+      String[] paths = schemaName.split("\\.");
+      if (paths.length == 2) {
+        plugin = getSchemaFactories().getPlugin(paths[0]);
+        if (plugin == null) {
+          return;
+        }
+
+        // Found the storage plugin for first part(e.g. 'dfs') of schemaName 
(e.g. 'dfs.tmp')
+        // register schema for this storage plugin to 'this'.
+        plugin.registerSchemas(schemaConfig, thisPlus);
+
+        // Load second level schemas for this storage plugin
+        final SchemaPlus firstlevelSchema = thisPlus.getSubSchema(paths[0]);
+        final List<SchemaPlus> secondLevelSchemas = Lists.newArrayList();
+        for (String secondLevelSchemaName : 
firstlevelSchema.getSubSchemaNames()) {
+          
secondLevelSchemas.add(firstlevelSchema.getSubSchema(secondLevelSchemaName));
+        }
+
+        for (SchemaPlus schema : secondLevelSchemas) {
+          org.apache.drill.exec.store.AbstractSchema drillSchema;
+          try {
+            drillSchema = 
schema.unwrap(org.apache.drill.exec.store.AbstractSchema.class);
+          } catch (ClassCastException e) {
+            throw new RuntimeException(String.format("Schema '%s' is not 
expected under root schema", schema.getName()));
+          }
+          SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
+          thisPlus.add(wrapper.getName(), wrapper);
+        }
+      }
+    } catch(ExecutionSetupException | IOException ex) {
+      logger.warn("Failed to load schema for \"" + schemaName + "\"!", ex);
+    }
+  }
+
+  static class RootSchema extends AbstractSchema {
+    @Override public Expression getExpression(SchemaPlus parentSchema,
+                                              String name) {
+      return Expressions.call(
+          DataContext.ROOT,
+          BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicSchema.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicSchema.java 
b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicSchema.java
new file mode 100644
index 0000000..01c38c2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicSchema.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.jdbc;
+
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+
+/**
+ * Unlike SimpleCalciteSchema, DynamicSchema could have an empty or partial 
schemaMap, but it could maintain a map of
+ * name->SchemaFactory, and only register schema when the corresponsdent name 
is requested.
+ */
+public class DynamicSchema extends SimpleCalciteSchema {
+
+  public DynamicSchema(CalciteSchema parent, Schema schema, String name) {
+    super(parent, schema, name);
+  }
+
+  @Override
+  protected CalciteSchema getImplicitSubSchema(String schemaName,
+                                               boolean caseSensitive) {
+    Schema s = schema.getSubSchema(schemaName);
+    if (s != null) {
+      return new DynamicSchema(this, s, schemaName);
+    }
+    CalciteSchema ret = getSubSchemaMap().get(schemaName);
+    return ret;
+  }
+
+  @Override
+  public SchemaPlus plus() {
+    return super.plus();
+  }
+
+  public static SchemaPlus createRootSchema(StoragePluginRegistry storages, 
SchemaConfig schemaConfig) {
+    DynamicRootSchema rootSchema = new DynamicRootSchema(storages, 
schemaConfig);
+    return rootSchema.plus();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 736d550..210d0d4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -230,7 +230,12 @@ public class FragmentContext extends BaseFragmentContext 
implements AutoCloseabl
     return context;
   }
 
-  public SchemaPlus getRootSchema() {
+  /**
+   * This method is only used to construt InfoSchemaReader, it is for the 
reader to get full schema, so here we
+   * are going to return a fully initialized schema tree.
+   * @return root schema's plus
+   */
+  public SchemaPlus getFullRootSchema() {
     if (queryContext == null) {
       fail(new UnsupportedOperationException("Schema tree can only be created 
in root fragment. " +
           "This is a non-root fragment."));
@@ -248,7 +253,7 @@ public class FragmentContext extends BaseFragmentContext 
implements AutoCloseabl
         .setIgnoreAuthErrors(isImpersonationEnabled)
         .build();
 
-    return queryContext.getRootSchema(schemaConfig);
+    return queryContext.getFullRootSchema(schemaConfig);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 8dbddbf..eb32bc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -163,14 +163,23 @@ public class QueryContext implements AutoCloseable, 
OptimizerRulesContext, Schem
   }
 
   /**
-   *  Create and return a SchemaTree with given <i>schemaConfig</i>.
+   *  Create and return a SchemaTree with given <i>schemaConfig</i> but some 
schemas (from storage plugins)
+   *  could be initialized later.
    * @param schemaConfig
    * @return
    */
   public SchemaPlus getRootSchema(SchemaConfig schemaConfig) {
     return schemaTreeProvider.createRootSchema(schemaConfig);
   }
+  /**
+   *  Create and return a fully initialized SchemaTree with given 
<i>schemaConfig</i>.
+   * @param schemaConfig
+   * @return
+   */
 
+  public SchemaPlus getFullRootSchema(SchemaConfig schemaConfig) {
+    return schemaTreeProvider.createFullRootSchema(schemaConfig);
+  }
   /**
    * Get the user name of the user who issued the query that is managed by 
this QueryContext.
    * @return

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 466d2fe..af3c2bf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -29,7 +29,7 @@ import com.google.common.collect.Sets;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
 import org.apache.calcite.config.CalciteConnectionProperty;
-import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.DynamicSchema;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelOptCluster;
@@ -130,9 +130,9 @@ public class SqlConverter {
     this.session = context.getSession();
     this.drillConfig = context.getConfig();
     this.catalog = new DrillCalciteCatalogReader(
-        this.rootSchema,
+        rootSchema,
         parserConfig.caseSensitive(),
-        CalciteSchema.from(defaultSchema).path(null),
+        DynamicSchema.from(defaultSchema).path(null),
         typeFactory,
         drillConfig,
         session);
@@ -375,7 +375,7 @@ public class SqlConverter {
     @Override
     public RelRoot expandView(RelDataType rowType, String queryString, 
SchemaPlus rootSchema, List<String> schemaPath) {
       final DrillCalciteCatalogReader catalogReader = new 
DrillCalciteCatalogReader(
-          rootSchema, // new root schema
+          rootSchema,
           parserConfig.caseSensitive(),
           schemaPath,
           typeFactory,
@@ -555,7 +555,7 @@ public class SqlConverter {
                               JavaTypeFactory typeFactory,
                               DrillConfig drillConfig,
                               UserSession session) {
-      super(CalciteSchema.from(rootSchema), defaultSchema,
+      super(DynamicSchema.from(rootSchema), defaultSchema,
           typeFactory, getConnectionConfig(caseSensitive));
       this.drillConfig = drillConfig;
       this.session = session;

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
index 21ab39f..0731387 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
@@ -20,12 +20,12 @@ package org.apache.drill.exec.store;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.calcite.jdbc.DynamicSchema;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
@@ -105,12 +105,36 @@ public class SchemaTreeProvider implements AutoCloseable {
    * @return
    */
   public SchemaPlus createRootSchema(SchemaConfig schemaConfig) {
+      final SchemaPlus rootSchema = 
DynamicSchema.createRootSchema(dContext.getStorage(), schemaConfig);
+      schemaTreesToClose.add(rootSchema);
+      return rootSchema;
+  }
+
+  /**
+   * Return full root schema with schema owner as the given user.
+   *
+   * @param userName Name of the user who is accessing the storage sources.
+   * @param provider {@link SchemaConfigInfoProvider} instance
+   * @return Root of the schema tree.
+   */
+  public SchemaPlus createFullRootSchema(final String userName, final 
SchemaConfigInfoProvider provider) {
+    final String schemaUser = isImpersonationEnabled ? userName : 
ImpersonationUtil.getProcessUserName();
+    final SchemaConfig schemaConfig = SchemaConfig.newBuilder(schemaUser, 
provider).build();
+    return createFullRootSchema(schemaConfig);
+  }
+  /**
+   * Create and return a Full SchemaTree with given <i>schemaConfig</i>.
+   * @param schemaConfig
+   * @return
+   */
+  public SchemaPlus createFullRootSchema(SchemaConfig schemaConfig) {
     try {
-      final SchemaPlus rootSchema = CalciteSchema.createRootSchema(false, 
false).plus();
+      final SchemaPlus rootSchema = 
DynamicSchema.createRootSchema(dContext.getStorage(), schemaConfig);
       dContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema);
       schemaTreesToClose.add(rootSchema);
       return rootSchema;
-    } catch(IOException e) {
+    }
+    catch(IOException e) {
       // We can't proceed further without a schema, throw a runtime exception.
       // Improve the error message for client side.
 
@@ -124,6 +148,7 @@ public class SchemaTreeProvider implements AutoCloseable {
           .addContext(contextString)
           .build(logger);
     }
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 3fb1c3a..f2edf5e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -494,4 +494,6 @@ public class StoragePluginRegistryImpl implements 
StoragePluginRegistry {
     return availablePlugins;
   }
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 5d99377..6d88d04 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -27,6 +27,7 @@ import org.apache.calcite.schema.Function;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 
+import org.apache.drill.exec.store.StoragePlugin;
 import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.store.AbstractSchema;
@@ -38,7 +39,9 @@ import 
org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 
@@ -49,11 +52,23 @@ public class FileSystemSchemaFactory implements 
SchemaFactory{
 
   public static final String DEFAULT_WS_NAME = "default";
 
+  public static final String LOCAL_FS_SCHEME = "file";
+
   private List<WorkspaceSchemaFactory> factories;
   private String schemaName;
+  protected FileSystemPlugin plugin;
 
   public FileSystemSchemaFactory(String schemaName, 
List<WorkspaceSchemaFactory> factories) {
-    super();
+    // when the correspondent FileSystemPlugin is not passed in, we dig into 
ANY workspace factory to get it.
+    if (factories.size() > 0) {
+      this.plugin = factories.get(0).getPlugin();
+    }
+    this.schemaName = schemaName;
+    this.factories = factories;
+  }
+
+  public FileSystemSchemaFactory(FileSystemPlugin plugin, String schemaName, 
List<WorkspaceSchemaFactory> factories) {
+    this.plugin = plugin;
     this.schemaName = schemaName;
     this.factories = factories;
   }
@@ -73,10 +88,10 @@ public class FileSystemSchemaFactory implements 
SchemaFactory{
 
     public FileSystemSchema(String name, SchemaConfig schemaConfig) throws 
IOException {
       super(ImmutableList.<String>of(), name);
+      final DrillFileSystem fs = 
ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), 
plugin.getFsConf());
       for(WorkspaceSchemaFactory f :  factories){
-        if (f.accessible(schemaConfig.getUserName())) {
-          @SuppressWarnings("resource")
-          WorkspaceSchema s = f.createSchema(getSchemaPath(), schemaConfig);
+        WorkspaceSchema s = f.createSchema(getSchemaPath(), schemaConfig, fs);
+        if (s != null) {
           schemaMap.put(s.getName(), s);
         }
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index bbf013d..a3886bb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -42,6 +42,7 @@ import org.apache.calcite.schema.FunctionParameter;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.TableMacro;
 import org.apache.calcite.schema.TranslatableTable;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.config.LogicalPlanPersistence;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -70,6 +71,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.AccessControlException;
 
@@ -150,14 +152,30 @@ public class WorkspaceSchemaFactory {
    * @return True if the user has access. False otherwise.
    */
   public boolean accessible(final String userName) throws IOException {
-    final FileSystem fs = ImpersonationUtil.createFileSystem(userName, fsConf);
+    final DrillFileSystem fs = ImpersonationUtil.createFileSystem(userName, 
fsConf);
+    return accessible(fs);
+  }
+
+  /**
+   * Checks whether a FileSystem object has the permission to list/read 
workspace directory
+   * @param fs a DrillFileSystem object that was created with certain user 
privilege
+   * @return True if the user has access. False otherwise.
+   * @throws IOException
+   */
+  public boolean accessible(DrillFileSystem fs) throws IOException {
     try {
-      // We have to rely on the listStatus as a FileSystem can have 
complicated controls such as regular unix style
-      // permissions, Access Control Lists (ACLs) or Access Control 
Expressions (ACE). Hadoop 2.7 version of FileSystem
-      // has a limited private API (FileSystem.access) to check the 
permissions directly
-      // (see https://issues.apache.org/jira/browse/HDFS-6570). Drill 
currently relies on Hadoop 2.5.0 version of
-      // FileClient. TODO: Update this when DRILL-3749 is fixed.
-      fs.listStatus(wsPath);
+      /**
+       * For Windows local file system, fs.access ends up using 
DeprecatedRawLocalFileStatus which has
+       * TrustedInstaller as owner, and a member of Administrators group could 
not satisfy the permission.
+       * In this case, we will still use method listStatus.
+       * In other cases, we use access method since it is cheaper.
+       */
+      if (SystemUtils.IS_OS_WINDOWS && 
fs.getUri().getScheme().equalsIgnoreCase(FileSystemSchemaFactory.LOCAL_FS_SCHEME))
 {
+        fs.listStatus(wsPath);
+      }
+      else {
+        fs.access(wsPath, FsAction.READ);
+      }
     } catch (final UnsupportedOperationException e) {
       logger.trace("The filesystem for this workspace does not support this 
operation.", e);
     } catch (final FileNotFoundException | AccessControlException e) {
@@ -171,8 +189,19 @@ public class WorkspaceSchemaFactory {
     return DotDrillType.VIEW.getPath(config.getLocation(), name);
   }
 
-  public WorkspaceSchema createSchema(List<String> parentSchemaPath, 
SchemaConfig schemaConfig) throws IOException {
-    return new WorkspaceSchema(parentSchemaPath, schemaName, schemaConfig);
+  public WorkspaceSchema createSchema(List<String> parentSchemaPath, 
SchemaConfig schemaConfig, DrillFileSystem fs) throws IOException {
+    if (!accessible(fs)) {
+      return null;
+    }
+    return new WorkspaceSchema(parentSchemaPath, schemaName, schemaConfig, fs);
+  }
+
+  public String getSchemaName() {
+    return schemaName;
+  }
+
+  public FileSystemPlugin getPlugin() {
+    return plugin;
   }
 
   /**
@@ -380,12 +409,12 @@ public class WorkspaceSchemaFactory {
   public class WorkspaceSchema extends AbstractSchema implements 
ExpandingConcurrentMap.MapValueFactory<TableInstance, DrillTable> {
     private final ExpandingConcurrentMap<TableInstance, DrillTable> tables = 
new ExpandingConcurrentMap<>(this);
     private final SchemaConfig schemaConfig;
-    private final DrillFileSystem fs;
+    private DrillFileSystem fs;
 
-    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, 
SchemaConfig schemaConfig) throws IOException {
+    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, 
SchemaConfig schemaConfig, DrillFileSystem fs) throws IOException {
       super(parentSchemaPath, wsName);
       this.schemaConfig = schemaConfig;
-      this.fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), 
fsConf);
+      this.fs = fs;
     }
 
     DrillTable getDrillTable(TableInstance key) {
@@ -395,10 +424,10 @@ public class WorkspaceSchemaFactory {
     @Override
     public boolean createView(View view) throws IOException {
       Path viewPath = getViewPath(view.getName());
-      boolean replaced = fs.exists(viewPath);
+      boolean replaced = getFS().exists(viewPath);
       final FsPermission viewPerms =
           new 
FsPermission(schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
-      try (OutputStream stream = DrillFileSystem.create(fs, viewPath, 
viewPerms)) {
+      try (OutputStream stream = DrillFileSystem.create(getFS(), viewPath, 
viewPerms)) {
         mapper.writeValue(stream, view);
       }
       return replaced;
@@ -421,7 +450,7 @@ public class WorkspaceSchemaFactory {
 
     @Override
     public void dropView(String viewName) throws IOException {
-      fs.delete(getViewPath(viewName), false);
+      getFS().delete(getViewPath(viewName), false);
     }
 
     private Set<String> getViews() {
@@ -429,7 +458,7 @@ public class WorkspaceSchemaFactory {
       // Look for files with ".view.drill" extension.
       List<DotDrillFile> files;
       try {
-        files = DotDrillUtil.getDotDrills(fs, new Path(config.getLocation()), 
DotDrillType.VIEW);
+        files = DotDrillUtil.getDotDrills(getFS(), new 
Path(config.getLocation()), DotDrillType.VIEW);
         for (DotDrillFile f : files) {
           viewSet.add(f.getBaseName());
         }
@@ -498,7 +527,7 @@ public class WorkspaceSchemaFactory {
       List<DotDrillFile> files = Collections.emptyList();
       try {
         try {
-          files = DotDrillUtil.getDotDrills(fs, new 
Path(config.getLocation()), tableName, DotDrillType.VIEW);
+          files = DotDrillUtil.getDotDrills(getFS(), new 
Path(config.getLocation()), tableName, DotDrillType.VIEW);
         } catch (AccessControlException e) {
           if (!schemaConfig.getIgnoreAuthErrors()) {
             logger.debug(e.getMessage());
@@ -570,18 +599,18 @@ public class WorkspaceSchemaFactory {
     }
 
     private DrillTable isReadable(FormatMatcher m, FileSelection 
fileSelection) throws IOException {
-      return m.isReadable(fs, fileSelection, plugin, storageEngineName, 
schemaConfig.getUserName());
+      return m.isReadable(getFS(), fileSelection, plugin, storageEngineName, 
schemaConfig.getUserName());
     }
 
     @Override
     public DrillTable create(TableInstance key) {
       try {
-        final FileSelection fileSelection = FileSelection.create(fs, 
config.getLocation(), key.sig.name, config.allowAccessOutsideWorkspace());
+        final FileSelection fileSelection = FileSelection.create(getFS(), 
config.getLocation(), key.sig.name, config.allowAccessOutsideWorkspace());
         if (fileSelection == null) {
           return null;
         }
 
-        final boolean hasDirectories = fileSelection.containsDirectories(fs);
+        final boolean hasDirectories = 
fileSelection.containsDirectories(getFS());
         if (key.sig.params.size() > 0) {
           FormatPluginConfig fconfig = 
optionExtractor.createConfigForTable(key);
           return new DynamicDrillTable(
@@ -591,7 +620,7 @@ public class WorkspaceSchemaFactory {
         if (hasDirectories) {
           for (final FormatMatcher matcher : dirMatchers) {
             try {
-              DrillTable table = matcher.isReadable(fs, fileSelection, plugin, 
storageEngineName, schemaConfig.getUserName());
+              DrillTable table = matcher.isReadable(getFS(), fileSelection, 
plugin, storageEngineName, schemaConfig.getUserName());
               if (table != null) {
                 return table;
               }
@@ -601,13 +630,13 @@ public class WorkspaceSchemaFactory {
           }
         }
 
-        final FileSelection newSelection = hasDirectories ? 
fileSelection.minusDirectories(fs) : fileSelection;
+        final FileSelection newSelection = hasDirectories ? 
fileSelection.minusDirectories(getFS()) : fileSelection;
         if (newSelection == null) {
           return null;
         }
 
         for (final FormatMatcher matcher : fileMatchers) {
-          DrillTable table = matcher.isReadable(fs, newSelection, plugin, 
storageEngineName, schemaConfig.getUserName());
+          DrillTable table = matcher.isReadable(getFS(), newSelection, plugin, 
storageEngineName, schemaConfig.getUserName());
           if (table != null) {
             return table;
           }
@@ -632,7 +661,7 @@ public class WorkspaceSchemaFactory {
       FormatMatcher matcher = null;
       try {
         for (FormatMatcher m : dropFileMatchers) {
-          if (m.isFileReadable(fs, file)) {
+          if (m.isFileReadable(getFS(), file)) {
             return m;
           }
         }
@@ -655,7 +684,7 @@ public class WorkspaceSchemaFactory {
      * @throws IOException is case of problems accessing table files
      */
     private boolean isHomogeneous(String tableName) throws IOException {
-      FileSelection fileSelection = FileSelection.create(fs, 
config.getLocation(), tableName, config.allowAccessOutsideWorkspace());
+      FileSelection fileSelection = FileSelection.create(getFS(), 
config.getLocation(), tableName, config.allowAccessOutsideWorkspace());
 
       if (fileSelection == null) {
         throw UserException
@@ -666,15 +695,15 @@ public class WorkspaceSchemaFactory {
 
       FormatMatcher matcher = null;
       Queue<FileStatus> listOfFiles = new LinkedList<>();
-      listOfFiles.addAll(fileSelection.getStatuses(fs));
+      listOfFiles.addAll(fileSelection.getStatuses(getFS()));
 
       while (!listOfFiles.isEmpty()) {
         FileStatus currentFile = listOfFiles.poll();
         if (currentFile.isDirectory()) {
-          listOfFiles.addAll(DrillFileSystemUtil.listFiles(fs, 
currentFile.getPath(), true));
+          listOfFiles.addAll(DrillFileSystemUtil.listFiles(getFS(), 
currentFile.getPath(), true));
         } else {
           if (matcher != null) {
-            if (!matcher.isFileReadable(fs, currentFile)) {
+            if (!matcher.isFileReadable(getFS(), currentFile)) {
               return false;
             }
           } else {
@@ -763,7 +792,7 @@ public class WorkspaceSchemaFactory {
       // Then look for files that start with this name and end in .drill.
       List<DotDrillFile> files = Collections.emptyList();
       try {
-        files = DotDrillUtil.getDotDrills(fs, new Path(config.getLocation()), 
DotDrillType.VIEW);
+        files = DotDrillUtil.getDotDrills(getFS(), new 
Path(config.getLocation()), DotDrillType.VIEW);
       } catch (AccessControlException e) {
         if (!schemaConfig.getIgnoreAuthErrors()) {
           logger.debug(e.getMessage());

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index 60581a7..ce05543 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -33,7 +33,7 @@ public class InfoSchemaBatchCreator implements 
BatchCreator<InfoSchemaSubScan>{
   @Override
   public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, 
List<RecordBatch> children)
       throws ExecutionSetupException {
-    RecordReader rr = 
config.getTable().getRecordReader(context.getRootSchema(), config.getFilter(), 
context.getOptions());
+    RecordReader rr = 
config.getTable().getRecordReader(context.getFullRootSchema(), 
config.getFilter(), context.getOptions());
     return new ScanBatch(config, context, Collections.singletonList(rr));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockBreakageStorage.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockBreakageStorage.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockBreakageStorage.java
new file mode 100644
index 0000000..f2c2d9f
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockBreakageStorage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mock;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.SchemaConfig;
+
+import java.io.IOException;
+
+public class MockBreakageStorage extends MockStorageEngine {
+
+  private boolean breakRegister;
+
+  public MockBreakageStorage(MockStorageEngineConfig configuration, 
DrillbitContext context, String name) {
+    super(configuration, context, name);
+    breakRegister = false;
+  }
+
+  public void setBreakRegister(boolean breakRegister) {
+    this.breakRegister = breakRegister;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) 
throws IOException {
+    if (breakRegister) {
+      throw new IOException("mock breakRegister!");
+    }
+    super.registerSchemas(schemaConfig, parent);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 0edf65f..8dd4e2e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -57,7 +57,7 @@ public class MockStorageEngine extends AbstractStoragePlugin {
 
   public MockStorageEngine(MockStorageEngineConfig configuration, 
DrillbitContext context, String name) {
     this.configuration = configuration;
-    this.schema = new MockSchema(this);
+    this.schema = new MockSchema(this, name);
   }
 
   @Override
@@ -123,6 +123,11 @@ public class MockStorageEngine extends 
AbstractStoragePlugin {
       this.engine = engine;
     }
 
+    public MockSchema(MockStorageEngine engine, String name) {
+      super(ImmutableList.<String>of(), name);
+      this.engine = engine;
+    }
+
     @Override
     public Table getTable(String name) {
       Table table = tableCache.get(name);

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
index cf64b20..f26848d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
@@ -543,7 +543,7 @@ public class MetadataProvider {
   private static <S> PojoRecordReader<S> getPojoRecordReader(final 
InfoSchemaTableType tableType, final InfoSchemaFilter filter, final DrillConfig 
config,
       final SchemaTreeProvider provider, final UserSession userSession) {
     final SchemaPlus rootSchema =
-        provider.createRootSchema(userSession.getCredentials().getUserName(), 
newSchemaConfigInfoProvider(config, userSession, provider));
+        
provider.createFullRootSchema(userSession.getCredentials().getUserName(), 
newSchemaConfigInfoProvider(config, userSession, provider));
     return tableType.getRecordReader(rootSchema, filter, 
userSession.getOptions());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java
new file mode 100644
index 0000000..9282eed
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterMockStorageFixture;
+import org.apache.drill.test.DrillTest;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestSchema extends DrillTest {
+
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new 
BaseDirTestWatcher();
+
+  private static ClusterMockStorageFixture cluster;
+  private static ClientFixture client;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = ClusterFixture.builder(dirTestWatcher).buildCustomMockStorage();
+    boolean breakRegisterSchema = true;
+    // With a broken storage which will throw exception in regiterSchema, 
every query (even on other storage)
+    // shall fail if Drill is still loading all schemas (include the broken 
schema) before a query.
+    cluster.insertMockStorage("mock_broken", breakRegisterSchema);
+    cluster.insertMockStorage("mock_good", !breakRegisterSchema);
+    client = cluster.clientFixture();
+  }
+
+  @Test (expected = Exception.class)
+  public void testQueryBrokenStorage() throws Exception {
+    String sql = "SELECT id_i, name_s10 FROM `mock_broken`.`employees_5`";
+    try {
+      client.queryBuilder().sql(sql).run();
+    } catch (Exception ex) {
+      assertTrue(ex.getMessage().contains("VALIDATION ERROR: Schema"));
+      throw ex;
+    }
+  }
+
+  @Test
+  public void testQueryGoodStorage() throws Exception {
+    String sql = "SELECT id_i, name_s10 FROM `mock_good`.`employees_5`";
+    client.queryBuilder().sql(sql).run();
+  }
+
+  @Test
+  public void testQueryGoodStorageWithDefaultSchema() throws Exception {
+    String use_dfs = "use dfs.tmp";
+    client.queryBuilder().sql(use_dfs).run();
+    String sql = "SELECT id_i, name_s10 FROM `mock_good`.`employees_5`";
+    client.queryBuilder().sql(sql).run();
+  }
+
+  @Test (expected = Exception.class)
+  public void testUseBrokenStorage() throws Exception {
+    try {
+      String use_dfs = "use mock_broken";
+      client.queryBuilder().sql(use_dfs).run();
+    } catch(Exception ex) {
+      assertTrue(ex.getMessage().contains("VALIDATION ERROR: Schema"));
+      throw ex;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
index 82bcf75..dfd63de 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
@@ -282,4 +282,8 @@ public class ClusterFixtureBuilder {
   public ClusterFixture build() {
     return new ClusterFixture(this);
   }
+
+  public ClusterMockStorageFixture buildCustomMockStorage() {
+    return new ClusterMockStorageFixture(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/18a71a38/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
new file mode 100644
index 0000000..54d7bf0
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
+import org.apache.drill.exec.store.mock.MockBreakageStorage;
+import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
+
+public class ClusterMockStorageFixture extends ClusterFixture {
+  ClusterMockStorageFixture(ClusterFixtureBuilder builder) {
+    super(builder);
+
+  }
+
+  /**
+   * This should be called after bits are started
+   * @param name nthe mock storage name we are going to create
+   */
+  public void insertMockStorage(String name, boolean breakRegisterSchema) {
+    for (Drillbit bit : drillbits()) {
+
+      // Bit name and registration.
+      final StoragePluginRegistry pluginRegistry = 
bit.getContext().getStorage();
+      MockStorageEngineConfig config = MockStorageEngineConfig.INSTANCE;
+      @SuppressWarnings("resource")
+      MockBreakageStorage plugin = new MockBreakageStorage(
+          MockStorageEngineConfig.INSTANCE, bit.getContext(), name);
+      ((StoragePluginRegistryImpl) pluginRegistry).definePlugin(name, config, 
plugin);
+
+      plugin.setBreakRegister(breakRegisterSchema);
+    }
+  }
+
+}

Reply via email to