Tim Armstrong has posted comments on this change. ( http://gerrit.cloudera.org:8080/9134 )
Change subject: IMPALA-5717: Support for reading ORC data files ...................................................................... Patch Set 5: (39 comments) Finally made time to do another detailed pass. Looks like we're getting closer. What I'm aiming for is good test coverage, clean code, and stability. I think this will initially be an experimental feature, so it's ok if we're missing some features or performance. http://gerrit.cloudera.org:8080/#/c/9134/5//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/9134/5//COMMIT_MSG@15 PS5, Line 15: premitive primitive. http://gerrit.cloudera.org:8080/#/c/9134/5//COMMIT_MSG@19 PS5, Line 19: - Most of the end-to-end tests can run on ORC format. Can you enable orc in test_scanners_fuzz.py and run that in a loop for a while? That can help flush out bugs in the scanner. I think this is the required diff: tarmstrong@tarmstrong-box:~/Impala/incubator-impala$ git diff diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py index c336a17..4b97a5f 100644 --- a/tests/query_test/test_scanners_fuzz.py +++ b/tests/query_test/test_scanners_fuzz.py @@ -62,7 +62,7 @@ class TestScannersFuzzing(ImpalaTestSuite): 'mem_limit' : cls.MEM_LIMITS})) # TODO: enable for more table formats once they consistently pass the fuzz test. cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format in ('avro', 'parquet') or + v.get_value('table_format').file_format in ('avro', 'parquet', 'orc') or (v.get_value('table_format').file_format == 'text' and v.get_value('table_format').compression_codec in ('none', 'lzo'))) @@ -194,11 +194,12 @@ class TestScannersFuzzing(ImpalaTestSuite): continue msg = "Should not throw error when abort_on_error=0: '{0}'".format(e) LOG.error(msg) - # Parquet and compressed text can fail the query for some parse errors. + # Parquet, ORC and compressed text can fail the query for some parse errors. # E.g. corrupt Parquet footer (IMPALA-3773) or a corrupt LZO index file # (IMPALA-4013). table_format = vector.get_value('table_format') if table_format.file_format != 'parquet' \ + and table_format.file_format != 'orc' \ and not (table_format.file_format == 'text' and table_format.compression_codec != 'none'): raise I ran it for a few iterations and it found at least one crash: (gdb) bt #0 0x00007f035c3bc428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54 #1 0x00007f035c3be02a in __GI_abort () at abort.c:89 #2 0x00007f035f300069 in ?? () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #3 0x00007f035f4b3997 in ?? () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #4 0x00007f035f4b40b7 in ?? () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #5 0x00007f035f304922 in ?? () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #6 0x00007f035f3094a9 in JVM_handle_linux_signal () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #7 0x00007f035f2fd1a8 in ?? () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #8 <signal handler called> #9 0x00007f035f2fa936 in ?? () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #10 0x00007f035ef10fe1 in ?? () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #11 0x00007f035f4b29b2 in ?? () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #12 0x00007f035f4b34eb in ?? () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #13 0x00007f035f30954f in JVM_handle_linux_signal () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #14 0x00007f035f2fd1a8 in ?? () from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so #15 <signal handler called> #16 0x0000000003d155e1 in orc::convertType(orc::proto::Type const&, orc::proto::Footer const&) () #17 0x0000000003d15928 in orc::convertType(orc::proto::Type const&, orc::proto::Footer const&) () #18 0x0000000003cffbb1 in orc::ReaderImpl::ReaderImpl(std::shared_ptr<orc::FileContents>, orc::ReaderOptions const&, unsigned long, unsigned long) () #19 0x0000000003d026dc in orc::createReader(std::unique_ptr<orc::InputStream, std::default_delete<orc::InputStream> >, orc::ReaderOptions const&) () #20 0x0000000001ceb163 in impala::HdfsOrcScanner::ProcessFileTail (this=0xa9722c0) at be/src/exec/hdfs-orc-scanner.cc:227 #21 0x0000000001cea885 in impala::HdfsOrcScanner::Open (this=0xa9722c0, context=0x7f02df7bf200) at be/src/exec/hdfs-orc-scanner.cc:167 #22 0x0000000001c2e919 in impala::HdfsScanNodeBase::CreateAndOpenScanner (this=0xc239500, partition=0xb8ae380, context=0x7f02df7bf200, scanner=0x7f02df7bf3d0) at be/src/exec/hdfs-scan-node-base.cc:598 #23 0x0000000001c235ad in impala::HdfsScanNode::ProcessSplit (this=0xc239500, filter_ctxs=std::vector of length 0, capacity 0, expr_results_pool=0x7f02df7bf5b0, scan_range=0x111ed860) at be/src/exec/hdfs-scan-node.cc:512 #24 0x0000000001c22d17 in impala::HdfsScanNode::ScannerThread (this=0xc239500) at be/src/exec/hdfs-scan-node.cc:437 #25 0x0000000001c2219f in impala::HdfsScanNode::<lambda()>::operator()(void) const (__closure=0x7f02df7bfce8) at be/src/exec/hdfs-scan-node.cc:350 #26 0x0000000001c2413d in boost::detail::function::void_function_obj_invoker0<impala::HdfsScanNode::ThreadTokenAvailableCb(impala::ThreadResourceMgr::ResourcePool*)::<lambda()>, void>::invoke(boost::detail::function::function_buffer &) (function_obj_ptr=...) at toolchain/boost-1.57.0-p3/include/boost/function/function_template.hpp:153 #27 0x000000000186f9d8 in boost::function0<void>::operator() (this=0x7f02df7bfce0) at toolchain/boost-1.57.0-p3/include/boost/function/function_template.hpp:767 #28 0x0000000001b7b84f in impala::Thread::SuperviseThread(std::string const&, std::string const&, boost::function<void ()>, impala::ThreadDebugInfo const*, impala::Promise<long>*) ( name="scanner-thread (finst:314a746c71f56eb3:9c65ee200000002, plan-node-id:0, thread-idx:0)", category="fragment-execution", functor=..., parent_thread_info=0x7f02e3cce990, thread_started=0x7f02e3ccd690) at be/src/util/thread.cc:356 #29 0x0000000001b83d25 in boost::_bi::list5<boost::_bi::value<std::string>, boost::_bi::value<std::string>, boost::_bi::value<boost::function<void ()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> >::operator()<void (*)(std::string const&, std::string const&, boost::function<void ()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list0>(boost::_bi::type<void>, void (*&)(std::string const&, std::string const&, boost::function<void ()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list0&, int) (this=0xbac39c0, f=@0xbac39b8: 0x1b7b4e8 <impala::Thread::SuperviseThread(std::string const&, std::string const&, boost::function<void ()>, impala::ThreadDebugInfo const*, impala::Promise<long>*)>, a=...) at toolchain/boost-1.57.0-p3/include/boost/bind/bind.hpp:525 #30 0x0000000001b83c49 in boost::_bi::bind_t<void, void (*)(std::string const&, std::string const&, boost::function<void ()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::string>, boost::_bi::value<std::string>, boost::_bi::value<boost::function<void ()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> > >::operator()() (this=0xbac39b8) at toolchain/boost-1.57.0-p3/include/boost/bind/bind_template.hpp:20 #31 0x0000000001b83c0c in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::string const&, std::string const&, boost::function<void ()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::string>, boost::_bi::value<std::string>, boost::_bi::value<boost::function<void ()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> > > >::run() ( this=0xbac3800) at toolchain/boost-1.57.0-p3/include/boost/thread/detail/thread.hpp:116 #32 0x0000000002e17faa in thread_proxy () #33 0x00007f035c7586ba in start_thread (arg=0x7f02df7c0700) at pthread_create.c:333 #34 0x00007f035c48e41d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109 http://gerrit.cloudera.org:8080/#/c/9134/5//COMMIT_MSG@22 PS5, Line 22: - Have passed all the tests. We're also missing test coverage to CHAR and VARCHAR. That looks like it was added specially for Parquet/Text/Avro with the chars_formats table, so we might need to do something similar for Orc. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.h File be/src/exec/hdfs-orc-scanner.h: http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.h@91 PS5, Line 91: virtual Status Open(ScannerContext *context) WARN_UNUSED_RESULT; Can we add override to the functions where it fits? http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.h@141 PS5, Line 141: map Could this be unordered_map? We generally prefer that that unless there's a requirement that it be ordered. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.h@177 PS5, Line 177: inline My intuition is that many of these "inline" specifiers will not be useful, either because the code isn't on the hot path or because they will not affect the compiler's decision. I'd consider removing them unless you have evidence that they help performance. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc File be/src/exec/hdfs-orc-scanner.cc: http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@59 PS5, Line 59: Clear Let's call this FreeAll() to match the equivalent method on MemPool. MemPool::Clear() doesn't free the chunks, just marks them unused. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@62 PS5, Line 62: std:: std:: prefix not needed for free(). http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@71 PS5, Line 71: mem_tracker_->Consume(size); We should do a TryConsume() here so that we don't go over the memory limit. That will also make it easier to test the error path. I think we want to return a proper MemLimitExceeded status. My suggestion is to create a special exception class that wraps a Status object. Then when we catch it we can unwrap and return the status. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@72 PS5, Line 72: (std::m std:: prefix not needed for malloc http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@77 PS5, Line 77: throw std::runtime_error("memory limit exceeded"); We could do the same status-wrapping thing here. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@90 PS5, Line 90: if (ImpaladMetrics::MEM_POOL_TOTAL_BYTES != nullptr) { Null check shouldn't be needed, right? We don't have it above. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@182 PS5, Line 182: Remove extra blank line. There is a bit too much vertical whitespace (i.e. blank lines) in this function, so it might be good to remove some. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@208 PS5, Line 208: && : reader_->getCompression() != orc::CompressionKind::CompressionKind_NONE CompressionKind_NONE is already handled in TranslateCompressionKind, right? http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@229 PS5, Line 229: Encounter Encountered http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@230 PS5, Line 230: Encounter Encountered. Can you also include the file name and mention "ORC" in the error message, e.g. "Encountered parse error in tail of ORC file /test-warehouse/table/foo.orc: " http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@233 PS5, Line 233: We can remove most of the blank lines in this function. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@234 PS5, Line 234: if (reader_->getNumberOfRows() == 0) { Nit: can fit if() and return on one line. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@240 PS5, Line 240: return Status(Substitute("Invalid file: $0. No stripes in this file but numberOfRows" "Invalid ORC file" http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@250 PS5, Line 250: case orc::CompressionKind::CompressionKind_NONE:return THdfsCompression::NONE; Nit: missing space http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@259 PS5, Line 259: VLOG_QUERY << "ORC files in ZSTD compression are counted as DEFAULT in profile"; We should return the error in a Status if the compression type isn't handled. We want to report the problem to the user in a helpful way. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@281 PS5, Line 281: nullptr NULL, not nullptr, since this is just a null indicator bit. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@291 PS5, Line 291: col_id_slot_map_[reader_->getType().getSubtype(col_idx_in_file)->getColumnId()] = slot_desc; nit: long line http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@292 PS5, Line 292: const ColumnType &col_type = scan_node_->hdfs_table()->col_descs()[col_idx].type(); We should validate that the ORC and Impala columns types are compatible at this point, instead of trying to handle it later. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@306 PS5, Line 306: std::m nit: std:: should not be necessary since this is imported by names.h http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@388 PS5, Line 388: unnecessary blank line http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@489 PS5, Line 489: : VLOG_QUERY << "Encounter parse error: " << e.what() : << " which is due to cancellation"; We don't need to log anything if we're cancelling normally. Why do we need to handle this case specially anyway? If we hit an error can't we just return that error? Impala already will ignore errors that happen after a user cancels the query. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@552 PS5, Line 552: VLOG_FILE Logging once per batch in VLOG_FILE seems too verbose. Remove or move to VLOG_ROW http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@567 PS5, Line 567: // TODO: combine this with the Parquet implementation Can we do this as part of this patch? I'd prefer to avoid adding duplicate code. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@590 PS5, Line 590: ColumnVectorBatch I think the argument to this function should be a StructVectorBatch instead of ColumnVectorBatch, because we require that the caller pass in a StructVectorBatch. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@598 PS5, Line 598: if (slot_desc == nullptr) { Can this be a DCHECK? Isn't this a bug in our code if the slot doesn't exist? http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@645 PS5, Line 645: case TYPE_FLOAT:*(reinterpret_cast<float*>(slot_val_ptr)) = val; needs space after : http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@647 PS5, Line 647: case TYPE_DOUBLE:*(reinterpret_cast<double*>(slot_val_ptr)) = val; needs space after : http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@657 PS5, Line 657: case orc::TypeKind::CHAR: { I don't think this handles CHAR(N) or VARCHAR(N) types in Impala correctly. VARCHAR isn't too difficult - you just need to truncate val_ptr->len to the max varchar len. CHAR has a different memory layout - it is a fixed-size buffer at 'slot'. I'm ok with deferring work on these, but we should make sure that Impala fails gracefully. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@670 PS5, Line 670: return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, val_ptr->len); long line http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@693 PS5, Line 693: // TODO warn if slot_desc->type().GetByteSize() != 16 I think we should validate the relationship between Impala and ORC types when reading the file metadata. If the Impala decimal type is too small, we can just return an error instead of truncating the data. http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-scanner.h File be/src/exec/hdfs-scanner.h: http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-scanner.h@326 PS5, Line 326: /// Size of the file footer. This is a guess. If this value is too little, we will Mention ORC and Parquet? E.g. "Size of the file footer for ORC and Parquet" http://gerrit.cloudera.org:8080/#/c/9134/5/testdata/LineItemMultiBlock/lineitem_orc_multiblock_one_stripe.orc File testdata/LineItemMultiBlock/lineitem_orc_multiblock_one_stripe.orc: PS5: These files are pretty big - they'll increase the size of the Impala repo quite a bit. Can we reduce the amount of data in them and still get the same test coverage? http://gerrit.cloudera.org:8080/#/c/9134/5/testdata/bin/load_nested.py File testdata/bin/load_nested.py: http://gerrit.cloudera.org:8080/#/c/9134/5/testdata/bin/load_nested.py@297 PS5, Line 297: def load_orc(): I don't think this is the best way to load the data - I think we should add it elsewhere. I'm not sure what the best approach is so I asked Joe McDonnell to comment. -- To view, visit http://gerrit.cloudera.org:8080/9134 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ia7b6ae4ce3b9ee8125b21993702faa87537790a4 Gerrit-Change-Number: 9134 Gerrit-PatchSet: 5 Gerrit-Owner: Quanlong Huang <huangquanl...@gmail.com> Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com> Gerrit-Reviewer: Quanlong Huang <huangquanl...@gmail.com> Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com> Gerrit-Comment-Date: Wed, 14 Mar 2018 23:00:01 +0000 Gerrit-HasComments: Yes