Revert "IMPALA-6215: Removes race when using LibCache."

This reverts commit 4aafa5e9ba9fe22d2dbc7764a796b3cd04136cc0.

See IMPALA-6488 for an example of a crash that this revert is
trying  to avoid.

Change-Id: I2e0a22d38f15fb3e34f08633ab0fc7c87c92d40f
Reviewed-on: http://gerrit.cloudera.org:8080/9244
Reviewed-by: Alex Behm <alex.b...@cloudera.com>
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Reviewed-by: Vuk Ercegovac <vercego...@cloudera.com>
Tested-by: Thomas Tauber-Marshall <tmarsh...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d609fe1b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d609fe1b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d609fe1b

Branch: refs/heads/2.x
Commit: d609fe1b44745abc979dc8b7884ecff356ea73cf
Parents: 1ddb156
Author: Vuk Ercegovac <vercego...@cloudera.com>
Authored: Wed Feb 7 11:24:50 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Thu Feb 8 07:01:53 2018 +0000

----------------------------------------------------------------------
 be/src/codegen/llvm-codegen.cc               |  5 +-
 be/src/exec/external-data-source-executor.cc |  5 +-
 be/src/exprs/hive-udf-call.cc                | 57 ++++++++-------
 be/src/exprs/hive-udf-call.h                 |  3 +
 be/src/runtime/lib-cache.cc                  | 22 +++---
 be/src/runtime/lib-cache.h                   | 44 ++----------
 be/src/service/fe-support.cc                 | 11 ++-
 tests/query_test/test_udfs.py                | 88 ++---------------------
 8 files changed, 58 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 72293a7..e1a606c 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -327,10 +327,9 @@ Status LlvmCodeGen::LinkModuleFromLocalFs(const string& 
file) {
 
 Status LlvmCodeGen::LinkModuleFromHdfs(const string& hdfs_location) {
   if (linked_modules_.find(hdfs_location) != linked_modules_.end()) return 
Status::OK();
-  LibCacheEntryHandle handle;
   string local_path;
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(hdfs_location, 
LibCache::TYPE_IR,
-      &handle, &local_path));
+  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(hdfs_location, 
LibCache::TYPE_IR,
+      &local_path));
   RETURN_IF_ERROR(LinkModuleFromLocalFs(local_path));
   linked_modules_.insert(hdfs_location);
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/exec/external-data-source-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/external-data-source-executor.cc 
b/be/src/exec/external-data-source-executor.cc
index 20fe50e..7c54f39 100644
--- a/be/src/exec/external-data-source-executor.cc
+++ b/be/src/exec/external-data-source-executor.cc
@@ -136,10 +136,9 @@ ExternalDataSourceExecutor::~ExternalDataSourceExecutor() {
 Status ExternalDataSourceExecutor::Init(const string& jar_path,
     const string& class_name, const string& api_version, const string& 
init_string) {
   DCHECK(!is_initialized_);
-  LibCacheEntryHandle handle;
   string local_jar_path;
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
-      jar_path, LibCache::TYPE_JAR, &handle, &local_jar_path));
+  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
+      jar_path, LibCache::TYPE_JAR, &local_jar_path));
 
   JNIEnv* jni_env = getJNIEnv();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/exprs/hive-udf-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index e1ac676..19e2e63 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -174,6 +174,10 @@ Status HiveUdfCall::Init(const RowDescriptor& row_desc, 
RuntimeState* state) {
   // Initialize children first.
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state));
 
+  // Copy the Hive Jar from hdfs to local file system.
+  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
+      fn_.hdfs_location, LibCache::TYPE_JAR, &local_location_));
+
   // Initialize input_byte_offsets_ and input_buffer_size_
   for (int i = 0; i < GetNumChildren(); ++i) {
     input_byte_offsets_.push_back(input_buffer_size_);
@@ -198,35 +202,30 @@ Status 
HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope,
   JNIEnv* env = getJNIEnv();
   if (env == NULL) return Status("Failed to get/create JVM");
 
-  {
-    LibCacheEntryHandle handle;
-    string local_location;
-    RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
-        fn_.hdfs_location, LibCache::TYPE_JAR, &handle, &local_location));
-    THiveUdfExecutorCtorParams ctor_params;
-    ctor_params.fn = fn_;
-    ctor_params.local_location = local_location;
-    ctor_params.input_byte_offsets = input_byte_offsets_;
-
-    jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
-    jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
-    jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
-
-    ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
-    ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
-    ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
-    ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
-
-    jbyteArray ctor_params_bytes;
-
-    // Add a scoped cleanup jni reference object. This cleans up local refs 
made below.
-    JniLocalFrame jni_frame;
-    RETURN_IF_ERROR(jni_frame.push(env));
-
-    RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
-    // Create the java executor object
-    jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, 
ctor_params_bytes);
-  }
+  THiveUdfExecutorCtorParams ctor_params;
+  ctor_params.fn = fn_;
+  ctor_params.local_location = local_location_;
+  ctor_params.input_byte_offsets = input_byte_offsets_;
+
+  jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
+  jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
+  jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
+
+  ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
+  ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
+  ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
+  ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
+
+  jbyteArray ctor_params_bytes;
+
+  // Add a scoped cleanup jni reference object. This cleans up local refs made
+  // below.
+  JniLocalFrame jni_frame;
+  RETURN_IF_ERROR(jni_frame.push(env));
+
+  RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+  // Create the java executor object
+  jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, 
ctor_params_bytes);
   RETURN_ERROR_IF_EXC(env);
   RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, 
&jni_ctx->executor));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/exprs/hive-udf-call.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h
index 8ca0372..7ce5eb0 100644
--- a/be/src/exprs/hive-udf-call.h
+++ b/be/src/exprs/hive-udf-call.h
@@ -116,6 +116,9 @@ class HiveUdfCall : public ScalarExpr {
   /// error.
   AnyVal* Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) const;
 
+  /// The path on the local FS to the UDF's jar
+  std::string local_location_;
+
   /// input_byte_offsets_[i] is the byte offset child ith's input argument 
should
   /// be written to.
   std::vector<int> input_byte_offsets_;

http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/runtime/lib-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index b4a4f59..d694c49 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -129,10 +129,6 @@ LibCacheEntry::~LibCacheEntry() {
   unlink(local_path.c_str());
 }
 
-LibCacheEntryHandle::~LibCacheEntryHandle() {
-  if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry_);
-}
-
 Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& 
symbol,
     void** fn_ptr, LibCacheEntry** ent, bool quiet) {
   if (hdfs_lib_file.empty()) {
@@ -177,23 +173,21 @@ void LibCache::DecrementUseCount(LibCacheEntry* entry) {
   if (entry == NULL) return;
   bool can_delete = false;
   {
-    unique_lock<mutex> lock(entry->lock);
+    unique_lock<mutex> lock(entry->lock);;
     --entry->use_count;
     can_delete = (entry->use_count == 0 && entry->should_remove);
   }
   if (can_delete) delete entry;
 }
 
-Status LibCache::GetLocalPath(const std::string& hdfs_lib_file, LibType type,
-    LibCacheEntryHandle* handle, string* path) {
-  DCHECK(handle != nullptr && handle->entry() == nullptr);
-  LibCacheEntry* entry = nullptr;
+Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type,
+                                 string* local_path) {
   unique_lock<mutex> lock;
+  LibCacheEntry* entry = NULL;
   RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
-  DCHECK(entry != nullptr);
-  ++entry->use_count;
-  handle->SetEntry(entry);
-  *path = entry->local_path;
+  DCHECK(entry != NULL);
+  DCHECK_EQ(entry->type, type);
+  *local_path = entry->local_path;
   return Status::OK();
 }
 
@@ -358,7 +352,7 @@ Status LibCache::GetCacheEntryInternal(const string& 
hdfs_lib_file, LibType type
     entry_lock->swap(local_entry_lock);
 
     RETURN_IF_ERROR((*entry)->copy_file_status);
-    DCHECK_EQ((*entry)->type, type) << (*entry)->local_path;
+    DCHECK_EQ((*entry)->type, type);
     DCHECK(!(*entry)->local_path.empty());
     return Status::OK();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index 7296a00..4a564ee 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -49,16 +49,11 @@ class RuntimeState;
 /// using the library. When the caller requests a ptr into the library, they
 /// are given the entry handle and must decrement the ref count when they
 /// are done.
-/// Note: Explicitly managing this reference count at the client is 
error-prone. See the
-/// api for accessing a path, GetLocalPath(), that uses the handle's scope to 
manage the
-/// reference count.
 //
 /// TODO:
 /// - refresh libraries
-/// - better cached module management
-/// - improve the api to be less error-prone (IMPALA-6439)
+/// - better cached module management.
 struct LibCacheEntry;
-class LibCacheEntryHandle;
 
 class LibCache {
  public:
@@ -76,16 +71,11 @@ class LibCache {
   /// Initializes the libcache. Must be called before any other APIs.
   static Status Init();
 
-  /// Gets the local 'path' used to cache the file stored at the global 
'hdfs_lib_file'.
-  /// If the referenced global file has not been copied locally, it copies it 
and
-  /// caches the result.
-  ///
-  /// 'handle' must remain in scope while 'path' is used. The reference count 
to the
-  /// underlying cache entry is decremented when 'handle' goes out-of-scope.
-  ///
-  /// Returns an error if 'hdfs_lib_file' cannot be copied to the local fs.
-  Status GetLocalPath(const std::string& hdfs_lib_file, LibType type,
-      LibCacheEntryHandle* handle, string* path);
+  /// Gets the local file system path for the library at 'hdfs_lib_file'. If
+  /// this file is not already on the local fs, it copies it and caches the
+  /// result. Returns an error if 'hdfs_lib_file' cannot be copied to the 
local fs.
+  Status GetLocalLibPath(const std::string& hdfs_lib_file, LibType type,
+                         std::string* local_path);
 
   /// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok 
otherwise.
   /// If 'quiet' is true, the error status for non-Java unfound symbols will 
not be logged.
@@ -104,7 +94,6 @@ class LibCache {
   /// using fn_ptr and it is no longer valid to use fn_ptr.
   //
   /// If 'quiet' is true, returned error statuses will not be logged.
-  /// TODO: api is error-prone. upgrade to LibCacheEntryHandle (see 
IMPALA-6439).
   Status GetSoFunctionPtr(const std::string& hdfs_lib_file, const std::string& 
symbol,
       void** fn_ptr, LibCacheEntry** entry, bool quiet = false);
 
@@ -175,27 +164,6 @@ class LibCache {
                            const LibMap::iterator& entry_iterator);
 };
 
-/// Handle for a LibCacheEntry that decrements its reference count when the 
handle is
-/// destroyed or re-used for another entry.
-class LibCacheEntryHandle {
- public:
-  LibCacheEntryHandle() {}
-  ~LibCacheEntryHandle();
-
- private:
-  friend class LibCache;
-
-  LibCacheEntry* entry() const { return entry_; }
-  void SetEntry(LibCacheEntry* entry) {
-    if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry);
-    entry_ = entry;
-  }
-
-  LibCacheEntry* entry_ = nullptr;
-
-  DISALLOW_COPY_AND_ASSIGN(LibCacheEntryHandle);
-};
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 12ac874..d1979e7 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -296,11 +296,9 @@ static void ResolveSymbolLookup(const TSymbolLookupParams 
params,
   if (params.fn_binary_type != TFunctionBinaryType::BUILTIN) {
     // Refresh the library if necessary since we're creating a new function
     LibCache::instance()->SetNeedsRefresh(params.location);
-    LibCacheEntryHandle handle;
     string dummy_local_path;
-    Status status = LibCache::instance()->GetLocalPath(
-        params.location, type, &handle, &dummy_local_path);
-
+    Status status = LibCache::instance()->GetLocalLibPath(
+        params.location, type, &dummy_local_path);
     if (!status.ok()) {
       result->__set_result_code(TSymbolLookupResultCode::BINARY_NOT_FOUND);
       result->__set_error_msg(status.GetDetail());
@@ -391,10 +389,9 @@ Java_org_apache_impala_service_FeSupport_NativeCacheJar(
       JniUtil::internal_exc_class(), nullptr);
 
   TCacheJarResult result;
-  LibCacheEntryHandle handle;
   string local_path;
-  Status status = LibCache::instance()->GetLocalPath(
-      params.hdfs_location, LibCache::TYPE_JAR, &handle, &local_path);
+  Status status = LibCache::instance()->GetLocalLibPath(params.hdfs_location,
+      LibCache::TYPE_JAR, &local_path);
   status.ToThrift(&result.status);
   if (status.ok()) result.__set_local_path(local_path);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 61dd54c..1ff716a 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -18,9 +18,6 @@
 from copy import copy
 import os
 import pytest
-import random
-import threading
-import time
 from subprocess import check_call
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
@@ -319,6 +316,8 @@ class TestUdfExecution(TestUdfBase):
       self.run_test_case('QueryTest/udf-non-deterministic', vector,
           use_db=unique_database)
 
+  # Runs serially as a temporary workaround for IMPALA_6092.
+  @pytest.mark.execute_serially
   def test_java_udfs(self, vector, unique_database):
     self.run_test_case('QueryTest/load-java-udfs', vector, 
use_db=unique_database)
     self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database)
@@ -419,6 +418,9 @@ class TestUdfTargeted(TestUdfBase):
             unique_database, tgt_udf_path))
     query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database)
 
+    # Dropping the function can interact with other tests whose Java classes 
are in
+    # the same jar. Use a copy of the jar to avoid unintended interactions.
+    # See IMPALA-6215 and IMPALA-6092 for examples.
     check_call(["hadoop", "fs", "-put", "-f", src_udf_path, tgt_udf_path])
     self.client.execute(drop_fn_stmt)
     self.client.execute(create_fn_stmt)
@@ -427,86 +429,6 @@ class TestUdfTargeted(TestUdfBase):
       assert "Unable to find class" in str(ex)
     self.client.execute(drop_fn_stmt)
 
-  def test_concurrent_jar_drop_use(self, vector, unique_database):
-    """IMPALA-6215: race between dropping/using java udf's defined in the same 
jar.
-       This test runs concurrent drop/use threads that result in class not 
found
-       exceptions when the race is present.
-    """
-    udf_src_path = os.path.join(
-      os.environ['IMPALA_HOME'], "testdata/udfs/impala-hive-udfs.jar")
-    udf_tgt_path = get_fs_path(
-      '/test-warehouse/impala-hive-udfs-{0}.jar'.format(unique_database))
-
-    create_fn_to_drop = """create function {0}.foo_{1}() returns string
-                           LOCATION '{2}' 
SYMBOL='org.apache.impala.TestUpdateUdf'"""
-    create_fn_to_use = """create function {0}.use_it(string) returns string
-                          LOCATION '{1}' SYMBOL='org.apache.impala.TestUdf'"""
-    drop_fn = "drop function if exists {0}.foo_{1}()"
-    use_fn = """select * from (select max(int_col) from functional.alltypesagg
-                where {0}.use_it(string_col) = 'blah' union all
-                (select max(int_col) from functional.alltypesagg
-                 where {0}.use_it(String_col) > '1' union all
-                (select max(int_col) from functional.alltypesagg
-                 where {0}.use_it(string_col) > '1'))) v"""
-    num_drops = 100
-    num_uses = 100
-
-    # use a unique jar for this test to avoid interactions with other tests
-    # that use the same jar
-    check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path])
-
-    # create all the functions.
-    setup_client = self.create_impala_client()
-    try:
-      s = create_fn_to_use.format(unique_database, udf_tgt_path)
-      print "use create: " + s
-      setup_client.execute(s)
-    except Exception as e:
-      print e
-      assert False
-    for i in range(0, num_drops):
-      try:
-        setup_client.execute(create_fn_to_drop.format(unique_database, i, 
udf_tgt_path))
-      except Exception as e:
-        print e
-        assert False
-
-    errors = []
-    def use_fn_method():
-      time.sleep(5 + random.random())
-      client = self.create_impala_client()
-      try:
-        client.execute(use_fn.format(unique_database))
-      except Exception as e: errors.append(e)
-
-    def drop_fn_method(i):
-      time.sleep(1 + random.random())
-      client = self.create_impala_client()
-      try:
-        client.execute(drop_fn.format(unique_database, i))
-      except Exception as e: errors.append(e)
-
-    # create threads to use functions.
-    runner_threads = []
-    for i in range(0, num_uses):
-      runner_threads.append(threading.Thread(target=use_fn_method))
-
-    # create threads to drop functions.
-    drop_threads = []
-    for i in range(0, num_drops):
-      runner_threads.append(threading.Thread(target=drop_fn_method, args=(i, 
)))
-
-    # launch all runner threads.
-    for t in runner_threads: t.start()
-
-    # join all threads.
-    for t in runner_threads: t.join();
-
-    # Check for any errors.
-    for e in errors: print e
-    assert len(errors) == 0
-
-
   @SkipIfLocal.multiple_impalad
   def test_hive_udfs_missing_jar(self, vector, unique_database):
     """ IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present

Reply via email to