IMPALA-6941: load more text scanner compression plugins

Add extensions for LZ4 and ZSTD (which are supported by Hadoop).
Even without a plugin this results in better behaviour because
we don't try to treat the files with unknown extensions as
uncompressed text.

Also allow loading tables containing files with unsupported
compression types. There was weird behaviour before we knew
of the file extension but didn't support querying the table -
the catalog would load the table but the impalad would fail
processing the catalog update. The simplest way to fix it
is to just allow loading the tables.

Similarly, make the "LOAD DATA" operation more permissive -
we can copy files into a directory even if we can't
decompress them.

Switch to always checking plugin version - running mismatched plugin
is inherently unsafe.

Testing:
Positive case where LZO is loaded is exercised. Added
coverage for negative case where LZO is disabled.

Fixed test gaps:
* Querying LZO table with LZO plugin not available.
* Interacting with tables with known but unsupported text
  compressions.
* Querying files with unknown compression suffixes (which are
  treated as uncompressed text).

Change-Id: If2a9c4a4a11bed81df706e9e834400bfedfe48e6
Reviewed-on: http://gerrit.cloudera.org:8080/10165
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f4f28d31
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f4f28d31
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f4f28d31

Branch: refs/heads/master
Commit: f4f28d310c08b97171a50147e283c1153fc57679
Parents: 6454b74
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Thu Apr 19 14:43:03 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Fri May 18 03:44:46 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |   1 +
 be/src/exec/CMakeLists.txt                      |   2 +-
 be/src/exec/hdfs-lzo-text-scanner.cc            | 117 ---------------
 be/src/exec/hdfs-lzo-text-scanner.h             |  64 --------
 be/src/exec/hdfs-plugin-text-scanner.cc         | 150 +++++++++++++++++++
 be/src/exec/hdfs-plugin-text-scanner.h          |  93 ++++++++++++
 be/src/exec/hdfs-scan-node-base.cc              |  16 +-
 be/src/exec/hdfs-text-scanner.cc                |  47 +++---
 be/src/exec/hdfs-text-scanner.h                 |  15 ++
 common/fbs/CatalogObjects.fbs                   |   3 +-
 .../apache/impala/analysis/LoadDataStmt.java    |  11 --
 .../apache/impala/catalog/HdfsCompression.java  |  18 ++-
 .../apache/impala/catalog/HdfsFileFormat.java   |  39 -----
 .../apache/impala/catalog/HdfsPartition.java    |  11 --
 .../queries/QueryTest/disable-lzo-plugin.test   |   7 +
 .../unsupported-compression-partitions.test     |  28 ++++
 tests/custom_cluster/test_scanner_plugin.py     |  34 +++++
 tests/metadata/test_partition_metadata.py       |  82 +++++++++-
 18 files changed, 458 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 66c0a6b..e9715ff 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -243,6 +243,7 @@ REMOVED_FLAG(rm_default_cpu_vcores);
 REMOVED_FLAG(rm_default_memory);
 REMOVED_FLAG(rpc_cnxn_attempts);
 REMOVED_FLAG(rpc_cnxn_retry_interval_ms);
+REMOVED_FLAG(skip_lzo_version_check);
 REMOVED_FLAG(staging_cgroup);
 REMOVED_FLAG(suppress_unknown_disk_id_warnings);
 REMOVED_FLAG(use_statestore);

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 0317cfe..2349df4 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -53,8 +53,8 @@ add_library(Exec
   hdfs-avro-scanner.cc
   hdfs-avro-table-writer.cc
   hdfs-avro-scanner-ir.cc
+  hdfs-plugin-text-scanner.cc
   hdfs-text-scanner.cc
-  hdfs-lzo-text-scanner.cc
   hdfs-text-table-writer.cc
   hdfs-sequence-table-writer.cc
   hdfs-parquet-scanner.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/be/src/exec/hdfs-lzo-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-lzo-text-scanner.cc 
b/be/src/exec/hdfs-lzo-text-scanner.cc
deleted file mode 100644
index 8af89f2..0000000
--- a/be/src/exec/hdfs-lzo-text-scanner.cc
+++ /dev/null
@@ -1,117 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hdfs-lzo-text-scanner.h"
-
-#include <hdfs.h>
-#include <boost/algorithm/string.hpp>
-#include "common/version.h"
-#include "exec/hdfs-scan-node.h"
-#include "exec/read-write-util.h"
-#include "runtime/runtime-state.h"
-#include "runtime/hdfs-fs-cache.h"
-#include "util/debug-util.h"
-#include "util/hdfs-util.h"
-#include "util/dynamic-util.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-DEFINE_bool(skip_lzo_version_check, false, "Disables checking the LZO library 
version "
-            "against the running Impala version.");
-
-const string HdfsLzoTextScanner::LIB_IMPALA_LZO = "libimpalalzo.so";
-
-namespace impala {
-Status HdfsLzoTextScanner::library_load_status_;
-
-SpinLock HdfsLzoTextScanner::lzo_load_lock_;
-
-const char* (*GetImpalaLzoBuildVersion)();
-
-HdfsScanner* (*HdfsLzoTextScanner::CreateLzoTextScanner)(
-    HdfsScanNodeBase* scan_node, RuntimeState* state);
-
-Status (*HdfsLzoTextScanner::LzoIssueInitialRanges)(
-    HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files);
-
-HdfsScanner* HdfsLzoTextScanner::GetHdfsLzoTextScanner(
-    HdfsScanNodeBase* scan_node, RuntimeState* state) {
-
-  // If the scanner was not loaded then no scans could be issued so we should
-  // never get here without having loaded the scanner.
-  DCHECK(CreateLzoTextScanner != NULL);
-
-  return (*CreateLzoTextScanner)(scan_node, state);
-}
-
-Status HdfsLzoTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
-    const vector<HdfsFileDesc*>& files) {
-  DCHECK(!files.empty());
-  if (LzoIssueInitialRanges == NULL) {
-    lock_guard<SpinLock> l(lzo_load_lock_);
-    if (library_load_status_.ok()) {
-      // LzoIssueInitialRanges && library_load_status_.ok() means we haven't 
tried loading
-      // the library yet.
-      library_load_status_ = LoadLzoLibrary();
-      if (!library_load_status_.ok()) {
-        stringstream ss;
-        ss << "Error loading impala-lzo library. Check that the impala-lzo 
library "
-           << "is at version " << GetDaemonBuildVersion();
-        library_load_status_.AddDetail(ss.str());
-        return library_load_status_;
-      }
-    } else {
-      // We only try to load the library once.
-      return library_load_status_;
-    }
-  }
-
-  return (*LzoIssueInitialRanges)(scan_node, files);
-}
-
-Status HdfsLzoTextScanner::LoadLzoLibrary() {
-  void* handle;
-  RETURN_IF_ERROR(DynamicOpen(LIB_IMPALA_LZO.c_str(), &handle));
-  RETURN_IF_ERROR(DynamicLookup(handle,
-      "GetImpalaBuildVersion", 
reinterpret_cast<void**>(&GetImpalaLzoBuildVersion)));
-
-  if (strcmp((*GetImpalaLzoBuildVersion)(), GetDaemonBuildVersion()) != 0) {
-    stringstream ss;
-    ss << "Impala LZO library was built against Impala version "
-       << (*GetImpalaLzoBuildVersion)() << ", but the running Impala version 
is "
-       << GetDaemonBuildVersion();
-    if (FLAGS_skip_lzo_version_check) {
-      LOG(ERROR) << ss.str();
-    } else {
-      return Status(ss.str());
-    }
-  }
-
-  RETURN_IF_ERROR(DynamicLookup(handle,
-      "CreateLzoTextScanner", 
reinterpret_cast<void**>(&CreateLzoTextScanner)));
-  RETURN_IF_ERROR(DynamicLookup(handle,
-      "LzoIssueInitialRangesImpl", 
reinterpret_cast<void**>(&LzoIssueInitialRanges)));
-
-  DCHECK(CreateLzoTextScanner != NULL);
-  DCHECK(LzoIssueInitialRanges != NULL);
-  LOG(INFO) << "Loaded impala-lzo library: " << LIB_IMPALA_LZO;
-  return Status::OK();
-}
-
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/be/src/exec/hdfs-lzo-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-lzo-text-scanner.h 
b/be/src/exec/hdfs-lzo-text-scanner.h
deleted file mode 100644
index d6bddf9..0000000
--- a/be/src/exec/hdfs-lzo-text-scanner.h
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_EXEC_HDFS_LZO_TEXT_SCANNER_H
-#define IMPALA_EXEC_HDFS_LZO_TEXT_SCANNER_H
-
-#include "common/status.h"
-#include "exec/scan-node.h"
-#include "exec/hdfs-scanner.h"
-#include "exec/hdfs-scan-node-base.h"
-#include "util/spinlock.h"
-
-namespace impala {
-
-/// This is a wrapper for calling the external HdfsLzoTextScanner
-/// The LZO scanner class is implemented in a dynamically linked library so 
that
-/// Impala does not include GPL code.  The two entry points are:
-/// IssueInitialRanges -- issue calls to the I/O manager to read the file 
headers
-/// GetHdfsLzoTextScanner -- returns a pointer to the Scanner object.
-class HdfsLzoTextScanner {
- public:
-  static HdfsScanner* GetHdfsLzoTextScanner(HdfsScanNodeBase* scan_node,
-      RuntimeState* state);
-  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
-                                   const std::vector<HdfsFileDesc*>& files);
-
- private:
-  /// Impala LZO library name -- GPL code.
-  const static std::string LIB_IMPALA_LZO;
-
-  /// If non-OK, then we have tried and failed to load the LZO library.
-  static Status library_load_status_;
-
-  /// Lock to protect loading of the lzo file library.
-  static SpinLock lzo_load_lock_;
-
-  /// Dynamically linked function to create the Lzo Scanner Object.
-  static HdfsScanner* (*CreateLzoTextScanner)
-      (HdfsScanNodeBase* scan_node, RuntimeState* state);
-
-  /// Dynamically linked function to set the initial scan ranges.
-  static Status (*LzoIssueInitialRanges)(
-      HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files);
-
-  /// Dynamically loads CreateLzoTextScanner and LzoIssueInitialRanges.
-  /// lzo_load_lock_ should be taken before calling this method.
-  static Status LoadLzoLibrary();
-};
-}
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/be/src/exec/hdfs-plugin-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-plugin-text-scanner.cc 
b/be/src/exec/hdfs-plugin-text-scanner.cc
new file mode 100644
index 0000000..19e3313
--- /dev/null
+++ b/be/src/exec/hdfs-plugin-text-scanner.cc
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-plugin-text-scanner.h"
+
+#include <algorithm>
+
+#include <hdfs.h>
+#include <boost/algorithm/string.hpp>
+#include "common/version.h"
+#include "exec/hdfs-scan-node.h"
+#include "exec/read-write-util.h"
+#include "runtime/runtime-state.h"
+#include "runtime/hdfs-fs-cache.h"
+#include "util/debug-util.h"
+#include "util/hdfs-util.h"
+#include "util/dynamic-util.h"
+
+#include "common/names.h"
+
+using namespace impala;
+
+using boost::algorithm::to_lower_copy;
+using boost::shared_lock;
+using boost::shared_mutex;
+using boost::upgrade_lock;
+using boost::upgrade_to_unique_lock;
+using std::find;
+
+// Allow LZO by default to maintain backwards compatibility. We can add more 
options
+// if we determine that the plugins are well-maintained and generally stable.
+DEFINE_string(enabled_hdfs_text_scanner_plugins, "LZO", "(Advanced) whitelist 
of HDFS "
+    "text scanner plugins that Impala will try to dynamically load. Must be a "
+    "comma-separated list of upper-case compression codec names. Each plugin 
implements "
+    "support for decompression and hands off the decompressed bytes to 
Impala's builtin "
+    "text parser for further processing (e.g. parsing delimited text).");
+
+static const string LIB_IMPALA_TEMPLATE = "libimpala$0.so";
+
+namespace impala {
+
+shared_mutex HdfsPluginTextScanner::library_load_lock_;
+
+std::unordered_map<string, HdfsPluginTextScanner::LoadedPlugin>
+    HdfsPluginTextScanner::loaded_plugins_;
+
+HdfsScanner* HdfsPluginTextScanner::GetHdfsPluginTextScanner(
+    HdfsScanNodeBase* scan_node, RuntimeState* state, const string& 
plugin_name) {
+  CreateScannerFn create_scanner_fn;
+  {
+    shared_lock<shared_mutex> l(library_load_lock_);
+    // If the scanner was not loaded then no scans could be issued so we should
+    // never get here without having loaded the scanner.
+    auto it = loaded_plugins_.find(plugin_name);
+    DCHECK(it != loaded_plugins_.end());
+    create_scanner_fn = it->second.create_scanner_fn;
+  }
+  return create_scanner_fn(scan_node, state);
+}
+
+Status HdfsPluginTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
+    const vector<HdfsFileDesc*>& files, const string& plugin_name) {
+  DCHECK(!files.empty());
+  IssueInitialRangesFn issue_initial_ranges_fn;
+  RETURN_IF_ERROR(CheckPluginEnabled(plugin_name));
+  {
+    upgrade_lock<shared_mutex> read_lock(library_load_lock_);
+    auto it = loaded_plugins_.find(plugin_name);
+    if (it == loaded_plugins_.end()) {
+      // We haven't tried loading the library yet.
+      upgrade_to_unique_lock<shared_mutex> write_lock(read_lock);
+      it = loaded_plugins_.insert(make_pair(plugin_name, 
LoadedPlugin())).first;
+      it->second.library_load_status = LoadPluginLibrary(plugin_name, 
&it->second);
+      if (!it->second.library_load_status.ok()) {
+        it->second.library_load_status.AddDetail(Substitute(
+              "Error loading plugin library for $0. Check that the library is 
at "
+              "version $1", plugin_name, GetDaemonBuildVersion()));
+        return it->second.library_load_status;
+      }
+    } else {
+      // We only try to load the library once - propagate the error if it 
previously
+      // failed.
+      RETURN_IF_ERROR(it->second.library_load_status);
+    }
+    issue_initial_ranges_fn = it->second.issue_initial_ranges_fn;
+  }
+
+  return issue_initial_ranges_fn(scan_node, files);
+}
+
+Status HdfsPluginTextScanner::CheckPluginEnabled(const string& plugin_name) {
+  vector<string> enabled_plugins;
+  boost::split(enabled_plugins, FLAGS_enabled_hdfs_text_scanner_plugins,
+      boost::is_any_of(","));
+  if (find(enabled_plugins.begin(), enabled_plugins.end(), plugin_name)
+      == enabled_plugins.end()) {
+    return Status(Substitute("Scanner plugin '$0' is not one of the enabled 
plugins: '$1'",
+          plugin_name, FLAGS_enabled_hdfs_text_scanner_plugins));
+  }
+  return Status::OK();
+}
+
+Status HdfsPluginTextScanner::LoadPluginLibrary(const string& plugin_name,
+    LoadedPlugin* plugin) {
+  RETURN_IF_ERROR(CheckPluginEnabled(plugin_name));
+  GetPluginImpalaBuildVersionFn get_plugin_impala_build_version;
+  void* handle;
+  string lib_name = Substitute(LIB_IMPALA_TEMPLATE, 
to_lower_copy(plugin_name));
+  RETURN_IF_ERROR(DynamicOpen(lib_name.c_str(), &handle));
+  RETURN_IF_ERROR(DynamicLookup(handle, "GetImpalaBuildVersion",
+      reinterpret_cast<void**>(&get_plugin_impala_build_version)));
+  if (strcmp(get_plugin_impala_build_version(), GetDaemonBuildVersion()) != 0) 
{
+    return Status(Substitute(
+        "Scanner plugin $0 was built against Impala version $1 but the running 
Impala "
+        "version is $2", plugin_name, get_plugin_impala_build_version(),
+        GetDaemonBuildVersion()));
+  }
+
+  // Camel case the library name to generate correct symbol, e.g. 
"CreateFooTextScanner".
+  string plugin_camelcase = to_lower_copy(plugin_name);
+  plugin_camelcase[0] = toupper(plugin_camelcase[0]);
+  string create_symbol = Substitute("Create$0TextScanner", plugin_camelcase);
+  string issue_initial_ranges_symbol =
+        Substitute("$0IssueInitialRangesImpl", plugin_camelcase);
+  RETURN_IF_ERROR(DynamicLookup(handle, create_symbol.c_str(),
+        reinterpret_cast<void**>(&plugin->create_scanner_fn)));
+  RETURN_IF_ERROR(DynamicLookup(handle, issue_initial_ranges_symbol.c_str(),
+        reinterpret_cast<void**>(&plugin->issue_initial_ranges_fn)));
+
+  DCHECK(plugin->create_scanner_fn != nullptr);
+  DCHECK(plugin->issue_initial_ranges_fn != nullptr);
+  LOG(INFO) << "Loaded plugin library for " << plugin_name << ": " << lib_name;
+  return Status::OK();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/be/src/exec/hdfs-plugin-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-plugin-text-scanner.h 
b/be/src/exec/hdfs-plugin-text-scanner.h
new file mode 100644
index 0000000..4b00165
--- /dev/null
+++ b/be/src/exec/hdfs-plugin-text-scanner.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_HDFS_PLUGIN_TEXT_SCANNER_H
+#define IMPALA_EXEC_HDFS_PLUGIN_TEXT_SCANNER_H
+
+#include <boost/thread/shared_mutex.hpp>
+
+#include "common/status.h"
+#include "exec/scan-node.h"
+#include "exec/hdfs-scanner.h"
+#include "exec/hdfs-scan-node-base.h"
+
+namespace impala {
+
+/// This is a wrapper for calling external implementations of text scanners for
+/// compression formats that Impala does not have builtin support for.
+/// The plugin scanners are implemented in dynamically linked libraries.
+///
+/// The two entry points are:
+/// IssueInitialRanges -- issue calls to the I/O manager to read the file 
headers
+/// GetHdfsPluginTextScanner -- returns a pointer to the Scanner object.
+///
+/// Plugin names should all be upper case. If the plugin name is FOO, then the 
plugin
+/// library must be called "libimpalafoo.so" and it must contain the following 
exported
+/// functions:
+///   const char* GetImpalaBuildVersion();
+///
+///   void FooIssueInitialRangesImpl(HdfsScanNodeBase*,
+///                                const std::vector<HdfsFileDesc*>&);
+///
+///   HdfsScanner* CreateFooTextScanner(HdfsScanNodeBase*, RuntimeState*);
+///
+class HdfsPluginTextScanner {
+ public:
+  static HdfsScanner* GetHdfsPluginTextScanner(HdfsScanNodeBase* scan_node,
+      RuntimeState* state, const std::string& plugin_name);
+  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
+     const std::vector<HdfsFileDesc*>& files, const std::string& plugin_name);
+
+ private:
+  // Typedefs for functions loaded from plugin shared objects.
+  typedef const char* (*GetPluginImpalaBuildVersionFn)();
+  typedef HdfsScanner* (*CreateScannerFn)
+      (HdfsScanNodeBase* scan_node, RuntimeState* state);
+  typedef Status (*IssueInitialRangesFn)(
+      HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files);
+
+  struct LoadedPlugin {
+    /// If non-OK, then we have tried and failed to load this plugin.
+    Status library_load_status;
+
+    /// Dynamically linked function to create the Scanner Object.
+    CreateScannerFn create_scanner_fn = nullptr;
+
+    /// Dynamically linked function to issue the initial scan ranges.
+    IssueInitialRangesFn issue_initial_ranges_fn = nullptr;
+  };
+
+  /// Lock to protect loading of libraries and 'loaded_plugins_'. We only 
allow loading a
+  /// single library at a time. Must be held in shared mode when accessing
+  /// 'loaded_plugins_' and exclusive mode when loading a library.
+  static boost::shared_mutex library_load_lock_;
+
+  /// Map from upper case plugin name to the loaded plugin.
+  /// Protected by 'library_load_lock_. Entries are never removed once loaded.
+  static std::unordered_map<std::string, LoadedPlugin> loaded_plugins_;
+
+  /// Return an error if the specified plugin isn't enabled.
+  static Status CheckPluginEnabled(const std::string& plugin_name);
+
+  /// Dynamically loads the required functions for the plugin identified by 
'plugin_name'
+  /// and populates 'create_scanner_fn' and 'issue_initial_ranges_fn' in 
'plugin'.
+  /// Returns an error if an error is encountered with loading the library or 
the
+  /// functions. 'library_load_lock_' must be held by the caller in exclusive 
mode.
+  static Status LoadPluginLibrary(const std::string& plugin_name, 
LoadedPlugin* plugin);
+};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index a40323b..8957efc 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -16,9 +16,10 @@
 // under the License.
 
 #include "exec/hdfs-scan-node-base.h"
+
+#include "exec/hdfs-plugin-text-scanner.h"
 #include "exec/base-sequence-scanner.h"
 #include "exec/hdfs-text-scanner.h"
-#include "exec/hdfs-lzo-text-scanner.h"
 #include "exec/hdfs-sequence-scanner.h"
 #include "exec/hdfs-rcfile-scanner.h"
 #include "exec/hdfs-avro-scanner.h"
@@ -635,12 +636,15 @@ Status 
HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
   // Create a new scanner for this file format and compression.
   switch (partition->file_format()) {
     case THdfsFileFormat::TEXT:
-      // Lzo-compressed text files are scanned by a scanner that it is 
implemented as a
-      // dynamic library, so that Impala does not include GPL code.
-      if (compression == THdfsCompression::LZO) {
-        scanner->reset(HdfsLzoTextScanner::GetHdfsLzoTextScanner(this, 
runtime_state_));
-      } else {
+      if (HdfsTextScanner::HasBuiltinSupport(compression)) {
         scanner->reset(new HdfsTextScanner(this, runtime_state_));
+      } else {
+        // No builtin support - we must have loaded the plugin in 
IssueInitialRanges().
+        auto it = _THdfsCompression_VALUES_TO_NAMES.find(compression);
+        DCHECK(it != _THdfsCompression_VALUES_TO_NAMES.end())
+            << "Already issued ranges for this compression type.";
+        scanner->reset(HdfsPluginTextScanner::GetHdfsPluginTextScanner(
+            this, runtime_state_, it->second));
       }
       break;
     case THdfsFileFormat::SEQUENCE_FILE:

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 1b08c97..3e4c223 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -22,7 +22,7 @@
 #include "codegen/llvm-codegen.h"
 #include "exec/delimited-text-parser.h"
 #include "exec/delimited-text-parser.inline.h"
-#include "exec/hdfs-lzo-text-scanner.h"
+#include "exec/hdfs-plugin-text-scanner.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/scanner-context.inline.h"
 #include "exec/text-converter.h"
@@ -77,7 +77,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* 
scan_node,
     const vector<HdfsFileDesc*>& files) {
   vector<ScanRange*> compressed_text_scan_ranges;
   int compressed_text_files = 0;
-  vector<HdfsFileDesc*> lzo_text_files;
+  map<string, vector<HdfsFileDesc*>> plugin_text_files;
   for (int i = 0; i < files.size(); ++i) {
     THdfsCompression::type compression = files[i]->file_compression;
     switch (compression) {
@@ -124,35 +124,36 @@ Status 
HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         }
         break;
 
-      case THdfsCompression::LZO:
-        // lzo-compressed text need to be processed by the specialized 
HdfsLzoTextScanner.
-        // Note that any LZO_INDEX files (no matter what the case of their 
suffix) will be
-        // filtered by the planner.
-        {
-        #ifndef NDEBUG
-          // No straightforward way to do this in one line inside a DCHECK, so 
for once
-          // we'll explicitly use NDEBUG to avoid executing debug-only code.
-          string lower_filename = files[i]->filename;
-          to_lower(lower_filename);
-          DCHECK(!ends_with(lower_filename, LZO_INDEX_SUFFIX));
-        #endif
-          lzo_text_files.push_back(files[i]);
+      default: {
+        // Other compression formats are only supported by a plugin.
+        auto it = _THdfsCompression_VALUES_TO_NAMES.find(compression);
+        if (it == _THdfsCompression_VALUES_TO_NAMES.end()) {
+          return Status(Substitute(
+                "Unexpected compression enum value: $0", 
static_cast<int>(compression)));
         }
-        break;
-
-      default:
-        DCHECK(false);
+#ifndef NDEBUG
+        // Note any LZO_INDEX files (no matter what the case of their suffix) 
should be
+        // filtered by the planner.
+        // No straightforward way to do this in one line inside a DCHECK, so 
for once
+        // we'll explicitly use NDEBUG to avoid executing debug-only code.
+        string lower_filename = files[i]->filename;
+        to_lower(lower_filename);
+        DCHECK(!ends_with(lower_filename, LZO_INDEX_SUFFIX));
+#endif
+        plugin_text_files[it->second].push_back(files[i]);
+      }
     }
   }
   if (compressed_text_scan_ranges.size() > 0) {
     RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
         compressed_text_files));
   }
-  if (lzo_text_files.size() > 0) {
-    // This will dlopen the lzo binary and can fail if the lzo binary is not 
present.
-    RETURN_IF_ERROR(HdfsLzoTextScanner::IssueInitialRanges(scan_node, 
lzo_text_files));
+  for (const auto& entry : plugin_text_files) {
+    DCHECK_GT(entry.second.size(), 0) << "List should be non-empty";
+    // This can fail if the plugin library can't be loaded.
+    RETURN_IF_ERROR(HdfsPluginTextScanner::IssueInitialRanges(
+          scan_node, entry.second, entry.first));
   }
-
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 25886ba..5b15372 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -65,6 +65,21 @@ class HdfsTextScanner : public HdfsScanner {
       const std::vector<ScalarExpr*>& conjuncts,
       llvm::Function** write_aligned_tuples_fn) WARN_UNUSED_RESULT;
 
+  /// Return true if we have builtin support for scanning text files 
compressed with this
+  /// codec.
+  static bool HasBuiltinSupport(THdfsCompression::type compression) {
+    switch (compression) {
+      case THdfsCompression::NONE:
+      case THdfsCompression::GZIP:
+      case THdfsCompression::SNAPPY:
+      case THdfsCompression::SNAPPY_BLOCKED:
+      case THdfsCompression::BZIP2:
+        return true;
+      default:
+        return false;
+    }
+  }
+
   /// Suffix for lzo index files.
   const static std::string LZO_INDEX_SUFFIX;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/common/fbs/CatalogObjects.fbs
----------------------------------------------------------------------
diff --git a/common/fbs/CatalogObjects.fbs b/common/fbs/CatalogObjects.fbs
index bf44380..c08099d 100644
--- a/common/fbs/CatalogObjects.fbs
+++ b/common/fbs/CatalogObjects.fbs
@@ -29,7 +29,8 @@ enum FbCompression: byte {
   SNAPPY_BLOCKED,
   LZO,
   LZ4,
-  ZLIB
+  ZLIB,
+  ZSTD
 }
 
 table FbFileBlock {

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
index 114862e..ddc0c6e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
@@ -223,17 +223,6 @@ public class LoadDataStmt extends StatementBase {
         }
       }
       Preconditions.checkNotNull(partition);
-
-      // Verify the files being loaded are supported.
-      for (FileStatus fStatus: fs.listStatus(source)) {
-        if (fs.isDirectory(fStatus.getPath())) continue;
-        StringBuilder errorMsg = new StringBuilder();
-        HdfsFileFormat fileFormat = 
partition.getInputFormatDescriptor().getFileFormat();
-        if 
(!fileFormat.isFileCompressionTypeSupported(fStatus.getPath().toString(),
-          errorMsg)) {
-          throw new AnalysisException(errorMsg.toString());
-        }
-      }
     } catch (FileNotFoundException e) {
       throw new AnalysisException("File not found: " + e.getMessage(), e);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
index dd81587..23282c3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
@@ -27,10 +27,11 @@ import com.google.common.collect.ImmutableMap;
  * Support for recognizing compression suffixes on data files.
  * Compression of a file is recognized in mapreduce by looking for suffixes of
  * supported codecs.
- * For now Impala supports LZO, GZIP, SNAPPY, and BZIP2. LZO can use the 
specific HIVE
- * input class.
+ * For now Impala supports LZO, GZIP, SNAPPY, BZIP2 and some additional 
formats if plugins
+ * are available. Even if a plugin is available, we need to add the file 
suffixes here so
+ * that we can resolve the compression type from the file name. LZO can use 
the specific
+ * HIVE input class.
  */
-// TODO: Add LZ4?
 public enum HdfsCompression {
   NONE,
   DEFLATE,
@@ -38,7 +39,9 @@ public enum HdfsCompression {
   BZIP2,
   SNAPPY,
   LZO,
-  LZO_INDEX; //Lzo index file.
+  LZO_INDEX, //Lzo index file.
+  LZ4,
+  ZSTD;
 
   /* Map from a suffix to a compression type */
   private static final ImmutableMap<String, HdfsCompression> SUFFIX_MAP =
@@ -49,6 +52,8 @@ public enum HdfsCompression {
           put("snappy", SNAPPY).
           put("lzo", LZO).
           put("index", LZO_INDEX).
+          put("lz4", LZ4).
+          put("zst", ZSTD).
           build();
 
   /* Given a file name return its compression type, if any. */
@@ -71,6 +76,8 @@ public enum HdfsCompression {
     case BZIP2: return THdfsCompression.BZIP2;
     case SNAPPY: return THdfsCompression.SNAPPY_BLOCKED;
     case LZO: return THdfsCompression.LZO;
+    case LZ4: return THdfsCompression.LZ4;
+    case ZSTD: return THdfsCompression.ZSTD;
     default: throw new IllegalStateException("Unexpected codec: " + this);
     }
   }
@@ -83,13 +90,14 @@ public enum HdfsCompression {
       case BZIP2: return FbCompression.BZIP2;
       case SNAPPY: return FbCompression.SNAPPY;
       case LZO: return FbCompression.LZO;
+      case LZ4: return FbCompression.LZ4;
+      case ZSTD: return FbCompression.ZSTD;
       default: throw new IllegalStateException("Unexpected codec: " + this);
     }
   }
 
   /* Returns a compression type based on (Hive's) intput format. Special case 
for LZO. */
   public static HdfsCompression fromHdfsInputFormatClass(String 
inputFormatClass) {
-    // TODO: Remove when we have the native LZO writer.
     Preconditions.checkNotNull(inputFormatClass);
     if (inputFormatClass.equals(HdfsFileFormat.LZO_TEXT.inputFormat())) {
       return LZO;

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
index 32cae72..46c2b87 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
@@ -201,45 +201,6 @@ public enum HdfsFileFormat {
     }
   }
 
-  /*
-   * Checks whether a file is supported in Impala based on the file extension.
-   * Returns true if the file format is supported. If the file format is not
-   * supported, then it returns false and 'errorMsg' contains details on the
-   * incompatibility.
-   *
-   * Impala supports LZO, GZIP, SNAPPY and BZIP2 on text files for partitions 
that have
-   * been declared in the metastore as TEXT. LZO files can have their own 
input format.
-   * For now, raise an error on any other type.
-   */
-  public boolean isFileCompressionTypeSupported(String fileName,
-      StringBuilder errorMsg) {
-    // Check to see if the file has a compression suffix.
-    // TODO: Add LZ4
-    HdfsCompression compressionType = HdfsCompression.fromFileName(fileName);
-    switch (compressionType) {
-      case LZO:
-      case LZO_INDEX:
-        // Index files are read by the LZO scanner directly.
-      case GZIP:
-      case SNAPPY:
-      case BZIP2:
-      case NONE:
-        return true;
-      case DEFLATE:
-        // TODO: Ensure that text/deflate works correctly
-        if (this == TEXT) {
-          errorMsg.append("Expected compressed text file with 
{.lzo,.gzip,.snappy,.bz2} "
-              + "suffix: " + fileName);
-          return false;
-        } else {
-          return true;
-        }
-      default:
-        errorMsg.append("Unknown compression suffix: " + fileName);
-        return false;
-    }
-  }
-
   /**
    * Returns true if this file format with the given compression format is 
splittable.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index e0850c6..1b05804 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -770,17 +770,6 @@ public class HdfsPartition implements 
Comparable<HdfsPartition> {
     } else {
       hmsParameters_ = Maps.newHashMap();
     }
-
-    // TODO: instead of raising an exception, we should consider marking this 
partition
-    // invalid and moving on, so that table loading won't fail and user can 
query other
-    // partitions.
-    for (FileDescriptor fileDescriptor: fileDescriptors_) {
-      StringBuilder errorMsg = new StringBuilder();
-      if 
(!getInputFormatDescriptor().getFileFormat().isFileCompressionTypeSupported(
-          fileDescriptor.getFileName(), errorMsg)) {
-        throw new RuntimeException(errorMsg.toString());
-      }
-    }
   }
 
   public HdfsPartition(HdfsTable table,

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/testdata/workloads/functional-query/queries/QueryTest/disable-lzo-plugin.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/disable-lzo-plugin.test 
b/testdata/workloads/functional-query/queries/QueryTest/disable-lzo-plugin.test
new file mode 100644
index 0000000..b141fd9
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/disable-lzo-plugin.test
@@ -0,0 +1,7 @@
+====
+---- QUERY
+# Test that running with plugin disabled fails gracefully.
+select * from functional_text_lzo.alltypes
+---- CATCH
+Scanner plugin 'LZO' is not one of the enabled plugins: ''
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/testdata/workloads/functional-query/queries/QueryTest/unsupported-compression-partitions.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/unsupported-compression-partitions.test
 
b/testdata/workloads/functional-query/queries/QueryTest/unsupported-compression-partitions.test
new file mode 100644
index 0000000..23199cc
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/unsupported-compression-partitions.test
@@ -0,0 +1,28 @@
+====
+---- QUERY
+# Test that querying only partitions with supported formats works as expected.
+select count(*)
+from multi_text_compression where month <= 2
+---- TYPES
+BIGINT
+---- RESULTS
+590
+====
+---- QUERY
+# Test that querying partition with unsupported plugin fails gracefully.
+select count(*)
+from multi_text_compression where month <= 3
+---- CATCH
+Scanner plugin 'LZ4' is not one of the enabled plugins: 'LZO'
+====
+---- QUERY
+# Unknown compression suffix is treated as uncompressed text.
+select id
+from multi_text_compression where month = 4
+---- RESULTS
+---- TYPES
+INT
+---- ERRORS
+Error converting column: 0 to INT
+Error parsing row: file: __HDFS_FILENAME__, before offset: 16
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/tests/custom_cluster/test_scanner_plugin.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_scanner_plugin.py 
b/tests/custom_cluster/test_scanner_plugin.py
new file mode 100644
index 0000000..e30e6f5
--- /dev/null
+++ b/tests/custom_cluster/test_scanner_plugin.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestScannerPlugin(CustomClusterTestSuite):
+  """Tests that involve changing the scanner plugin option."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--enabled_hdfs_text_scanner_plugins=")
+  def test_disable_lzo_plugin(self, vector):
+    """Test that we can gracefully handle a disabled plugin."""
+    # Should be able to query valid partitions only.
+    self.run_test_case('QueryTest/disable-lzo-plugin', vector)

http://git-wip-us.apache.org/repos/asf/impala/blob/f4f28d31/tests/metadata/test_partition_metadata.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_partition_metadata.py 
b/tests/metadata/test_partition_metadata.py
index a6f635a..99eff1e 100644
--- a/tests/metadata/test_partition_metadata.py
+++ b/tests/metadata/test_partition_metadata.py
@@ -17,8 +17,9 @@
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
-from tests.common.test_dimensions import create_single_exec_option_dimension
-from tests.util.filesystem_utils import WAREHOUSE
+from tests.common.test_dimensions import (create_single_exec_option_dimension,
+    create_uncompressed_text_dimension)
+from tests.util.filesystem_utils import get_fs_path, WAREHOUSE
 
 # Map from the test dimension file_format string to the SQL "STORED AS"
 # argument.
@@ -145,3 +146,80 @@ class TestPartitionMetadata(ImpalaTestSuite):
     self.client.execute("select * from %s" % FQ_TBL_IMP)
     # Make sure the table remains accessible in HIVE
     self.run_stmt_in_hive("select * from %s" % FQ_TBL_IMP)
+
+
+class TestPartitionMetadataUncompressedTextOnly(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestPartitionMetadataUncompressedTextOnly, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  @SkipIfLocal.hdfs_client
+  def test_unsupported_text_compression(self, vector, unique_database):
+    """Test querying tables with a mix of supported and unsupported 
compression codecs.
+    Should be able to query partitions with supported codecs."""
+    TBL_NAME = "multi_text_compression"
+    FQ_TBL_NAME = unique_database + "." + TBL_NAME
+    TBL_LOCATION = get_fs_path(
+        '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, TBL_NAME))
+
+    file_format = vector.get_value('table_format').file_format
+    # Clean up any existing data in the table directory.
+    self.filesystem_client.delete_file_dir(TBL_NAME, recursive=True)
+    # Create the table
+    self.client.execute(
+        "create external table {0} like functional.alltypes location 
'{1}'".format(
+        FQ_TBL_NAME, TBL_LOCATION))
+
+    self.__add_alltypes_partition(vector, FQ_TBL_NAME, "functional", 2009, 1)
+    self.__add_alltypes_partition(vector, FQ_TBL_NAME, "functional_text_lzo", 
2009, 2)
+
+    # Create a new partition with a bogus file with the unsupported LZ4 suffix.
+    lz4_year = 2009
+    lz4_month = 3
+    lz4_ym_partition_loc = self.__make_ym_partition_dir(TBL_LOCATION, 
lz4_year, lz4_month)
+    
self.filesystem_client.create_file("{0}/fake.lz4".format(lz4_ym_partition_loc)[1:],
+        "some test data")
+    self.client.execute(
+        "alter table {0} add partition (year={1}, month={2}) location 
'{3}'".format(
+        FQ_TBL_NAME, lz4_year, lz4_month, lz4_ym_partition_loc))
+
+    # Create a new partition with a bogus compression codec.
+    fake_comp_year = 2009
+    fake_comp_month = 4
+    fake_comp_ym_partition_loc = self.__make_ym_partition_dir(
+        TBL_LOCATION, fake_comp_year, fake_comp_month)
+    self.filesystem_client.create_file(
+        "{0}/fake.fake_comp".format(fake_comp_ym_partition_loc)[1:], "fake 
compression")
+    self.client.execute(
+        "alter table {0} add partition (year={1}, month={2}) location 
'{3}'".format(
+        FQ_TBL_NAME, fake_comp_year, fake_comp_month, 
fake_comp_ym_partition_loc))
+
+    show_files_result = self.client.execute("show files in 
{0}".format(FQ_TBL_NAME))
+    assert len(show_files_result.data) == 4, "Expected one file per partition 
dir"
+
+    self.run_test_case('QueryTest/unsupported-compression-partitions', vector,
+        unique_database)
+
+  def __add_alltypes_partition(self, vector, dst_tbl, src_db, year, month):
+    """Add the (year, month) partition from ${db_name}.alltypes to dst_tbl."""
+    tbl_location = self._get_table_location("{0}.alltypes".format(src_db), 
vector)
+    part_location = "{0}/year={1}/month={2}".format(tbl_location, year, month)
+    self.client.execute(
+        "alter table {0} add partition (year={1}, month={2}) location 
'{3}'".format(
+        dst_tbl, year, month, part_location))
+
+  def __make_ym_partition_dir(self, tbl_location, year, month):
+    """Create the year/month partition directory and return the path."""
+    y_partition_loc = "{0}/year={1}".format(tbl_location, year)
+    ym_partition_loc = "{0}/month={1}".format(y_partition_loc, month)
+    self.filesystem_client.delete_file_dir(tbl_location[1:], recursive=True)
+    self.filesystem_client.make_dir(tbl_location[1:])
+    self.filesystem_client.make_dir(y_partition_loc[1:])
+    self.filesystem_client.make_dir(ym_partition_loc[1:])
+    return ym_partition_loc

Reply via email to