Revert "IMPALA-6215: Removes race when using LibCache." This reverts commit 4aafa5e9ba9fe22d2dbc7764a796b3cd04136cc0.
See IMPALA-6488 for an example of a crash that this revert is trying to avoid. Change-Id: I2e0a22d38f15fb3e34f08633ab0fc7c87c92d40f Reviewed-on: http://gerrit.cloudera.org:8080/9244 Reviewed-by: Alex Behm <alex.b...@cloudera.com> Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Reviewed-by: Vuk Ercegovac <vercego...@cloudera.com> Tested-by: Thomas Tauber-Marshall <tmarsh...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d609fe1b Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d609fe1b Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d609fe1b Branch: refs/heads/2.x Commit: d609fe1b44745abc979dc8b7884ecff356ea73cf Parents: 1ddb156 Author: Vuk Ercegovac <vercego...@cloudera.com> Authored: Wed Feb 7 11:24:50 2018 -0800 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Thu Feb 8 07:01:53 2018 +0000 ---------------------------------------------------------------------- be/src/codegen/llvm-codegen.cc | 5 +- be/src/exec/external-data-source-executor.cc | 5 +- be/src/exprs/hive-udf-call.cc | 57 ++++++++------- be/src/exprs/hive-udf-call.h | 3 + be/src/runtime/lib-cache.cc | 22 +++--- be/src/runtime/lib-cache.h | 44 ++---------- be/src/service/fe-support.cc | 11 ++- tests/query_test/test_udfs.py | 88 ++--------------------- 8 files changed, 58 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/codegen/llvm-codegen.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc index 72293a7..e1a606c 100644 --- a/be/src/codegen/llvm-codegen.cc +++ b/be/src/codegen/llvm-codegen.cc @@ -327,10 +327,9 @@ Status LlvmCodeGen::LinkModuleFromLocalFs(const string& file) { Status LlvmCodeGen::LinkModuleFromHdfs(const string& hdfs_location) { if (linked_modules_.find(hdfs_location) != linked_modules_.end()) return Status::OK(); - LibCacheEntryHandle handle; string local_path; - RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(hdfs_location, LibCache::TYPE_IR, - &handle, &local_path)); + RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(hdfs_location, LibCache::TYPE_IR, + &local_path)); RETURN_IF_ERROR(LinkModuleFromLocalFs(local_path)); linked_modules_.insert(hdfs_location); return Status::OK(); http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/exec/external-data-source-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc index 20fe50e..7c54f39 100644 --- a/be/src/exec/external-data-source-executor.cc +++ b/be/src/exec/external-data-source-executor.cc @@ -136,10 +136,9 @@ ExternalDataSourceExecutor::~ExternalDataSourceExecutor() { Status ExternalDataSourceExecutor::Init(const string& jar_path, const string& class_name, const string& api_version, const string& init_string) { DCHECK(!is_initialized_); - LibCacheEntryHandle handle; string local_jar_path; - RETURN_IF_ERROR(LibCache::instance()->GetLocalPath( - jar_path, LibCache::TYPE_JAR, &handle, &local_jar_path)); + RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath( + jar_path, LibCache::TYPE_JAR, &local_jar_path)); JNIEnv* jni_env = getJNIEnv(); http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/exprs/hive-udf-call.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc index e1ac676..19e2e63 100644 --- a/be/src/exprs/hive-udf-call.cc +++ b/be/src/exprs/hive-udf-call.cc @@ -174,6 +174,10 @@ Status HiveUdfCall::Init(const RowDescriptor& row_desc, RuntimeState* state) { // Initialize children first. RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state)); + // Copy the Hive Jar from hdfs to local file system. + RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath( + fn_.hdfs_location, LibCache::TYPE_JAR, &local_location_)); + // Initialize input_byte_offsets_ and input_buffer_size_ for (int i = 0; i < GetNumChildren(); ++i) { input_byte_offsets_.push_back(input_buffer_size_); @@ -198,35 +202,30 @@ Status HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope, JNIEnv* env = getJNIEnv(); if (env == NULL) return Status("Failed to get/create JVM"); - { - LibCacheEntryHandle handle; - string local_location; - RETURN_IF_ERROR(LibCache::instance()->GetLocalPath( - fn_.hdfs_location, LibCache::TYPE_JAR, &handle, &local_location)); - THiveUdfExecutorCtorParams ctor_params; - ctor_params.fn = fn_; - ctor_params.local_location = local_location; - ctor_params.input_byte_offsets = input_byte_offsets_; - - jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_]; - jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()]; - jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()]; - - ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer; - ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer; - ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer; - ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value; - - jbyteArray ctor_params_bytes; - - // Add a scoped cleanup jni reference object. This cleans up local refs made below. - JniLocalFrame jni_frame; - RETURN_IF_ERROR(jni_frame.push(env)); - - RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); - // Create the java executor object - jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes); - } + THiveUdfExecutorCtorParams ctor_params; + ctor_params.fn = fn_; + ctor_params.local_location = local_location_; + ctor_params.input_byte_offsets = input_byte_offsets_; + + jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_]; + jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()]; + jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()]; + + ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer; + ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer; + ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer; + ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value; + + jbyteArray ctor_params_bytes; + + // Add a scoped cleanup jni reference object. This cleans up local refs made + // below. + JniLocalFrame jni_frame; + RETURN_IF_ERROR(jni_frame.push(env)); + + RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); + // Create the java executor object + jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes); RETURN_ERROR_IF_EXC(env); RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor)); http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/exprs/hive-udf-call.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h index 8ca0372..7ce5eb0 100644 --- a/be/src/exprs/hive-udf-call.h +++ b/be/src/exprs/hive-udf-call.h @@ -116,6 +116,9 @@ class HiveUdfCall : public ScalarExpr { /// error. AnyVal* Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) const; + /// The path on the local FS to the UDF's jar + std::string local_location_; + /// input_byte_offsets_[i] is the byte offset child ith's input argument should /// be written to. std::vector<int> input_byte_offsets_; http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/runtime/lib-cache.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc index b4a4f59..d694c49 100644 --- a/be/src/runtime/lib-cache.cc +++ b/be/src/runtime/lib-cache.cc @@ -129,10 +129,6 @@ LibCacheEntry::~LibCacheEntry() { unlink(local_path.c_str()); } -LibCacheEntryHandle::~LibCacheEntryHandle() { - if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry_); -} - Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& symbol, void** fn_ptr, LibCacheEntry** ent, bool quiet) { if (hdfs_lib_file.empty()) { @@ -177,23 +173,21 @@ void LibCache::DecrementUseCount(LibCacheEntry* entry) { if (entry == NULL) return; bool can_delete = false; { - unique_lock<mutex> lock(entry->lock); + unique_lock<mutex> lock(entry->lock);; --entry->use_count; can_delete = (entry->use_count == 0 && entry->should_remove); } if (can_delete) delete entry; } -Status LibCache::GetLocalPath(const std::string& hdfs_lib_file, LibType type, - LibCacheEntryHandle* handle, string* path) { - DCHECK(handle != nullptr && handle->entry() == nullptr); - LibCacheEntry* entry = nullptr; +Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type, + string* local_path) { unique_lock<mutex> lock; + LibCacheEntry* entry = NULL; RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry)); - DCHECK(entry != nullptr); - ++entry->use_count; - handle->SetEntry(entry); - *path = entry->local_path; + DCHECK(entry != NULL); + DCHECK_EQ(entry->type, type); + *local_path = entry->local_path; return Status::OK(); } @@ -358,7 +352,7 @@ Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type entry_lock->swap(local_entry_lock); RETURN_IF_ERROR((*entry)->copy_file_status); - DCHECK_EQ((*entry)->type, type) << (*entry)->local_path; + DCHECK_EQ((*entry)->type, type); DCHECK(!(*entry)->local_path.empty()); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/runtime/lib-cache.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h index 7296a00..4a564ee 100644 --- a/be/src/runtime/lib-cache.h +++ b/be/src/runtime/lib-cache.h @@ -49,16 +49,11 @@ class RuntimeState; /// using the library. When the caller requests a ptr into the library, they /// are given the entry handle and must decrement the ref count when they /// are done. -/// Note: Explicitly managing this reference count at the client is error-prone. See the -/// api for accessing a path, GetLocalPath(), that uses the handle's scope to manage the -/// reference count. // /// TODO: /// - refresh libraries -/// - better cached module management -/// - improve the api to be less error-prone (IMPALA-6439) +/// - better cached module management. struct LibCacheEntry; -class LibCacheEntryHandle; class LibCache { public: @@ -76,16 +71,11 @@ class LibCache { /// Initializes the libcache. Must be called before any other APIs. static Status Init(); - /// Gets the local 'path' used to cache the file stored at the global 'hdfs_lib_file'. - /// If the referenced global file has not been copied locally, it copies it and - /// caches the result. - /// - /// 'handle' must remain in scope while 'path' is used. The reference count to the - /// underlying cache entry is decremented when 'handle' goes out-of-scope. - /// - /// Returns an error if 'hdfs_lib_file' cannot be copied to the local fs. - Status GetLocalPath(const std::string& hdfs_lib_file, LibType type, - LibCacheEntryHandle* handle, string* path); + /// Gets the local file system path for the library at 'hdfs_lib_file'. If + /// this file is not already on the local fs, it copies it and caches the + /// result. Returns an error if 'hdfs_lib_file' cannot be copied to the local fs. + Status GetLocalLibPath(const std::string& hdfs_lib_file, LibType type, + std::string* local_path); /// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok otherwise. /// If 'quiet' is true, the error status for non-Java unfound symbols will not be logged. @@ -104,7 +94,6 @@ class LibCache { /// using fn_ptr and it is no longer valid to use fn_ptr. // /// If 'quiet' is true, returned error statuses will not be logged. - /// TODO: api is error-prone. upgrade to LibCacheEntryHandle (see IMPALA-6439). Status GetSoFunctionPtr(const std::string& hdfs_lib_file, const std::string& symbol, void** fn_ptr, LibCacheEntry** entry, bool quiet = false); @@ -175,27 +164,6 @@ class LibCache { const LibMap::iterator& entry_iterator); }; -/// Handle for a LibCacheEntry that decrements its reference count when the handle is -/// destroyed or re-used for another entry. -class LibCacheEntryHandle { - public: - LibCacheEntryHandle() {} - ~LibCacheEntryHandle(); - - private: - friend class LibCache; - - LibCacheEntry* entry() const { return entry_; } - void SetEntry(LibCacheEntry* entry) { - if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry); - entry_ = entry; - } - - LibCacheEntry* entry_ = nullptr; - - DISALLOW_COPY_AND_ASSIGN(LibCacheEntryHandle); -}; - } #endif http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/be/src/service/fe-support.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index 12ac874..d1979e7 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -296,11 +296,9 @@ static void ResolveSymbolLookup(const TSymbolLookupParams params, if (params.fn_binary_type != TFunctionBinaryType::BUILTIN) { // Refresh the library if necessary since we're creating a new function LibCache::instance()->SetNeedsRefresh(params.location); - LibCacheEntryHandle handle; string dummy_local_path; - Status status = LibCache::instance()->GetLocalPath( - params.location, type, &handle, &dummy_local_path); - + Status status = LibCache::instance()->GetLocalLibPath( + params.location, type, &dummy_local_path); if (!status.ok()) { result->__set_result_code(TSymbolLookupResultCode::BINARY_NOT_FOUND); result->__set_error_msg(status.GetDetail()); @@ -391,10 +389,9 @@ Java_org_apache_impala_service_FeSupport_NativeCacheJar( JniUtil::internal_exc_class(), nullptr); TCacheJarResult result; - LibCacheEntryHandle handle; string local_path; - Status status = LibCache::instance()->GetLocalPath( - params.hdfs_location, LibCache::TYPE_JAR, &handle, &local_path); + Status status = LibCache::instance()->GetLocalLibPath(params.hdfs_location, + LibCache::TYPE_JAR, &local_path); status.ToThrift(&result.status); if (status.ok()) result.__set_local_path(local_path); http://git-wip-us.apache.org/repos/asf/impala/blob/d609fe1b/tests/query_test/test_udfs.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py index 61dd54c..1ff716a 100644 --- a/tests/query_test/test_udfs.py +++ b/tests/query_test/test_udfs.py @@ -18,9 +18,6 @@ from copy import copy import os import pytest -import random -import threading -import time from subprocess import check_call from tests.beeswax.impala_beeswax import ImpalaBeeswaxException @@ -319,6 +316,8 @@ class TestUdfExecution(TestUdfBase): self.run_test_case('QueryTest/udf-non-deterministic', vector, use_db=unique_database) + # Runs serially as a temporary workaround for IMPALA_6092. + @pytest.mark.execute_serially def test_java_udfs(self, vector, unique_database): self.run_test_case('QueryTest/load-java-udfs', vector, use_db=unique_database) self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database) @@ -419,6 +418,9 @@ class TestUdfTargeted(TestUdfBase): unique_database, tgt_udf_path)) query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database) + # Dropping the function can interact with other tests whose Java classes are in + # the same jar. Use a copy of the jar to avoid unintended interactions. + # See IMPALA-6215 and IMPALA-6092 for examples. check_call(["hadoop", "fs", "-put", "-f", src_udf_path, tgt_udf_path]) self.client.execute(drop_fn_stmt) self.client.execute(create_fn_stmt) @@ -427,86 +429,6 @@ class TestUdfTargeted(TestUdfBase): assert "Unable to find class" in str(ex) self.client.execute(drop_fn_stmt) - def test_concurrent_jar_drop_use(self, vector, unique_database): - """IMPALA-6215: race between dropping/using java udf's defined in the same jar. - This test runs concurrent drop/use threads that result in class not found - exceptions when the race is present. - """ - udf_src_path = os.path.join( - os.environ['IMPALA_HOME'], "testdata/udfs/impala-hive-udfs.jar") - udf_tgt_path = get_fs_path( - '/test-warehouse/impala-hive-udfs-{0}.jar'.format(unique_database)) - - create_fn_to_drop = """create function {0}.foo_{1}() returns string - LOCATION '{2}' SYMBOL='org.apache.impala.TestUpdateUdf'""" - create_fn_to_use = """create function {0}.use_it(string) returns string - LOCATION '{1}' SYMBOL='org.apache.impala.TestUdf'""" - drop_fn = "drop function if exists {0}.foo_{1}()" - use_fn = """select * from (select max(int_col) from functional.alltypesagg - where {0}.use_it(string_col) = 'blah' union all - (select max(int_col) from functional.alltypesagg - where {0}.use_it(String_col) > '1' union all - (select max(int_col) from functional.alltypesagg - where {0}.use_it(string_col) > '1'))) v""" - num_drops = 100 - num_uses = 100 - - # use a unique jar for this test to avoid interactions with other tests - # that use the same jar - check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path]) - - # create all the functions. - setup_client = self.create_impala_client() - try: - s = create_fn_to_use.format(unique_database, udf_tgt_path) - print "use create: " + s - setup_client.execute(s) - except Exception as e: - print e - assert False - for i in range(0, num_drops): - try: - setup_client.execute(create_fn_to_drop.format(unique_database, i, udf_tgt_path)) - except Exception as e: - print e - assert False - - errors = [] - def use_fn_method(): - time.sleep(5 + random.random()) - client = self.create_impala_client() - try: - client.execute(use_fn.format(unique_database)) - except Exception as e: errors.append(e) - - def drop_fn_method(i): - time.sleep(1 + random.random()) - client = self.create_impala_client() - try: - client.execute(drop_fn.format(unique_database, i)) - except Exception as e: errors.append(e) - - # create threads to use functions. - runner_threads = [] - for i in range(0, num_uses): - runner_threads.append(threading.Thread(target=use_fn_method)) - - # create threads to drop functions. - drop_threads = [] - for i in range(0, num_drops): - runner_threads.append(threading.Thread(target=drop_fn_method, args=(i, ))) - - # launch all runner threads. - for t in runner_threads: t.start() - - # join all threads. - for t in runner_threads: t.join(); - - # Check for any errors. - for e in errors: print e - assert len(errors) == 0 - - @SkipIfLocal.multiple_impalad def test_hive_udfs_missing_jar(self, vector, unique_database): """ IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present