Tim Armstrong has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/9134 )

Change subject: IMPALA-5717: Support for reading ORC data files
......................................................................


Patch Set 5:

(39 comments)

Finally made time to do another detailed pass. Looks like we're getting closer.

What I'm aiming for is good test coverage, clean code, and stability. I think 
this will initially be an experimental feature, so it's ok if we're missing 
some features or performance.

http://gerrit.cloudera.org:8080/#/c/9134/5//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/9134/5//COMMIT_MSG@15
PS5, Line 15: premitive
primitive.


http://gerrit.cloudera.org:8080/#/c/9134/5//COMMIT_MSG@19
PS5, Line 19:  - Most of the end-to-end tests can run on ORC format.
Can you enable orc in test_scanners_fuzz.py and run that in a loop for a while? 
That can help flush out bugs in the scanner. I think this is the required diff:

tarmstrong@tarmstrong-box:~/Impala/incubator-impala$ git diff
diff --git a/tests/query_test/test_scanners_fuzz.py 
b/tests/query_test/test_scanners_fuzz.py
index c336a17..4b97a5f 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -62,7 +62,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
           'mem_limit' : cls.MEM_LIMITS}))
     # TODO: enable for more table formats once they consistently pass the fuzz 
test.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format in ('avro', 'parquet') or
+        v.get_value('table_format').file_format in ('avro', 'parquet', 'orc') 
or
         (v.get_value('table_format').file_format == 'text' and
           v.get_value('table_format').compression_codec in ('none', 'lzo')))

@@ -194,11 +194,12 @@ class TestScannersFuzzing(ImpalaTestSuite):
           continue
         msg = "Should not throw error when abort_on_error=0: '{0}'".format(e)
         LOG.error(msg)
-        # Parquet and compressed text can fail the query for some parse errors.
+        # Parquet, ORC and compressed text can fail the query for some parse 
errors.
         # E.g. corrupt Parquet footer (IMPALA-3773) or a corrupt LZO index file
         # (IMPALA-4013).
         table_format = vector.get_value('table_format')
         if table_format.file_format != 'parquet' \
+            and table_format.file_format != 'orc' \
             and not (table_format.file_format == 'text' and
             table_format.compression_codec != 'none'):
           raise


I ran it for a few iterations and it found at least one crash:

(gdb) bt
#0  0x00007f035c3bc428 in __GI_raise (sig=sig@entry=6) at 
../sysdeps/unix/sysv/linux/raise.c:54
#1  0x00007f035c3be02a in __GI_abort () at abort.c:89
#2  0x00007f035f300069 in ?? () from 
/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#3  0x00007f035f4b3997 in ?? () from 
/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#4  0x00007f035f4b40b7 in ?? () from 
/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#5  0x00007f035f304922 in ?? () from 
/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#6  0x00007f035f3094a9 in JVM_handle_linux_signal ()
   from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#7  0x00007f035f2fd1a8 in ?? () from 
/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#8  <signal handler called>
#9  0x00007f035f2fa936 in ?? () from 
/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#10 0x00007f035ef10fe1 in ?? () from 
/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#11 0x00007f035f4b29b2 in ?? () from 
/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#12 0x00007f035f4b34eb in ?? () from 
/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#13 0x00007f035f30954f in JVM_handle_linux_signal ()
   from /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#14 0x00007f035f2fd1a8 in ?? () from 
/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server/libjvm.so
#15 <signal handler called>
#16 0x0000000003d155e1 in orc::convertType(orc::proto::Type const&, 
orc::proto::Footer const&) ()
#17 0x0000000003d15928 in orc::convertType(orc::proto::Type const&, 
orc::proto::Footer const&) ()
#18 0x0000000003cffbb1 in 
orc::ReaderImpl::ReaderImpl(std::shared_ptr<orc::FileContents>, 
orc::ReaderOptions const&, unsigned long, unsigned long) ()
#19 0x0000000003d026dc in orc::createReader(std::unique_ptr<orc::InputStream, 
std::default_delete<orc::InputStream> >, orc::ReaderOptions const&) ()
#20 0x0000000001ceb163 in impala::HdfsOrcScanner::ProcessFileTail 
(this=0xa9722c0)
    at be/src/exec/hdfs-orc-scanner.cc:227
#21 0x0000000001cea885 in impala::HdfsOrcScanner::Open (this=0xa9722c0, 
context=0x7f02df7bf200)
    at be/src/exec/hdfs-orc-scanner.cc:167
#22 0x0000000001c2e919 in impala::HdfsScanNodeBase::CreateAndOpenScanner 
(this=0xc239500, partition=0xb8ae380,
    context=0x7f02df7bf200, scanner=0x7f02df7bf3d0) at 
be/src/exec/hdfs-scan-node-base.cc:598
#23 0x0000000001c235ad in impala::HdfsScanNode::ProcessSplit (this=0xc239500,
    filter_ctxs=std::vector of length 0, capacity 0, 
expr_results_pool=0x7f02df7bf5b0, scan_range=0x111ed860)
    at be/src/exec/hdfs-scan-node.cc:512
#24 0x0000000001c22d17 in impala::HdfsScanNode::ScannerThread (this=0xc239500)
    at be/src/exec/hdfs-scan-node.cc:437
#25 0x0000000001c2219f in impala::HdfsScanNode::<lambda()>::operator()(void) 
const (__closure=0x7f02df7bfce8)
    at be/src/exec/hdfs-scan-node.cc:350
#26 0x0000000001c2413d in 
boost::detail::function::void_function_obj_invoker0<impala::HdfsScanNode::ThreadTokenAvailableCb(impala::ThreadResourceMgr::ResourcePool*)::<lambda()>,
 void>::invoke(boost::detail::function::function_buffer &) 
(function_obj_ptr=...)
    at 
toolchain/boost-1.57.0-p3/include/boost/function/function_template.hpp:153
#27 0x000000000186f9d8 in boost::function0<void>::operator() 
(this=0x7f02df7bfce0)
    at 
toolchain/boost-1.57.0-p3/include/boost/function/function_template.hpp:767
#28 0x0000000001b7b84f in impala::Thread::SuperviseThread(std::string const&, 
std::string const&, boost::function<void ()>, impala::ThreadDebugInfo const*, 
impala::Promise<long>*) (
    name="scanner-thread (finst:314a746c71f56eb3:9c65ee200000002, 
plan-node-id:0, thread-idx:0)",
    category="fragment-execution", functor=..., 
parent_thread_info=0x7f02e3cce990,
    thread_started=0x7f02e3ccd690) at be/src/util/thread.cc:356
#29 0x0000000001b83d25 in boost::_bi::list5<boost::_bi::value<std::string>, 
boost::_bi::value<std::string>, boost::_bi::value<boost::function<void ()> >, 
boost::_bi::value<impala::ThreadDebugInfo*>, 
boost::_bi::value<impala::Promise<long>*> >::operator()<void (*)(std::string 
const&, std::string const&, boost::function<void ()>, impala::ThreadDebugInfo 
const*, impala::Promise<long>*), boost::_bi::list0>(boost::_bi::type<void>, 
void (*&)(std::string const&, std::string const&, boost::function<void ()>, 
impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list0&, 
int) (this=0xbac39c0,
    f=@0xbac39b8: 0x1b7b4e8 <impala::Thread::SuperviseThread(std::string 
const&, std::string const&, boost::function<void ()>, impala::ThreadDebugInfo 
const*, impala::Promise<long>*)>, a=...)
    at toolchain/boost-1.57.0-p3/include/boost/bind/bind.hpp:525
#30 0x0000000001b83c49 in boost::_bi::bind_t<void, void (*)(std::string const&, 
std::string const&, boost::function<void ()>, impala::ThreadDebugInfo const*, 
impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::string>, 
boost::_bi::value<std::string>, boost::_bi::value<boost::function<void ()> >, 
boost::_bi::value<impala::ThreadDebugInfo*>, 
boost::_bi::value<impala::Promise<long>*> > >::operator()() (this=0xbac39b8)
    at toolchain/boost-1.57.0-p3/include/boost/bind/bind_template.hpp:20
#31 0x0000000001b83c0c in boost::detail::thread_data<boost::_bi::bind_t<void, 
void (*)(std::string const&, std::string const&, boost::function<void ()>, 
impala::ThreadDebugInfo const*, impala::Promise<long>*), 
boost::_bi::list5<boost::_bi::value<std::string>, 
boost::_bi::value<std::string>, boost::_bi::value<boost::function<void ()> >, 
boost::_bi::value<impala::ThreadDebugInfo*>, 
boost::_bi::value<impala::Promise<long>*> > > >::run() (
    this=0xbac3800) at 
toolchain/boost-1.57.0-p3/include/boost/thread/detail/thread.hpp:116
#32 0x0000000002e17faa in thread_proxy ()
#33 0x00007f035c7586ba in start_thread (arg=0x7f02df7c0700) at 
pthread_create.c:333
#34 0x00007f035c48e41d in clone () at 
../sysdeps/unix/sysv/linux/x86_64/clone.S:109


http://gerrit.cloudera.org:8080/#/c/9134/5//COMMIT_MSG@22
PS5, Line 22:  - Have passed all the tests.
We're also missing test coverage to CHAR and VARCHAR. That looks like it was 
added specially for Parquet/Text/Avro with the chars_formats table, so we might 
need to do something similar for Orc.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.h
File be/src/exec/hdfs-orc-scanner.h:

http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.h@91
PS5, Line 91:   virtual Status Open(ScannerContext *context) WARN_UNUSED_RESULT;
Can we add override to the functions where it fits?


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.h@141
PS5, Line 141: map
Could this be unordered_map? We generally prefer that that unless there's a 
requirement that it be ordered.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.h@177
PS5, Line 177: inline
My intuition is that many of these "inline" specifiers will not be useful, 
either because the code isn't on the hot path or because they will not affect 
the compiler's decision. I'd consider removing them unless you have evidence 
that they help performance.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc
File be/src/exec/hdfs-orc-scanner.cc:

http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@59
PS5, Line 59: Clear
Let's call this FreeAll() to match the equivalent method on MemPool. 
MemPool::Clear() doesn't free the chunks, just marks them unused.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@62
PS5, Line 62:  std::
std:: prefix not needed for free().


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@71
PS5, Line 71:   mem_tracker_->Consume(size);
We should do a TryConsume() here so that we don't go over the memory limit. 
That will also make it easier to test the error path.

I think we want to return a proper MemLimitExceeded status. My suggestion is to 
create a special exception class that wraps a Status object. Then when we catch 
it we can unwrap and return the status.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@72
PS5, Line 72: (std::m
std:: prefix not needed for malloc


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@77
PS5, Line 77:     throw std::runtime_error("memory limit exceeded");
We could do the same status-wrapping thing here.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@90
PS5, Line 90:   if (ImpaladMetrics::MEM_POOL_TOTAL_BYTES != nullptr) {
Null check shouldn't be needed, right? We don't have it above.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@182
PS5, Line 182:
Remove extra blank line. There is a bit too much vertical whitespace (i.e. 
blank lines) in this function, so it might be good to remove some.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@208
PS5, Line 208:  &&
             :       reader_->getCompression() != 
orc::CompressionKind::CompressionKind_NONE
CompressionKind_NONE is already handled in TranslateCompressionKind, right?


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@229
PS5, Line 229: Encounter
Encountered


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@230
PS5, Line 230: Encounter
Encountered.

Can you also include the file name and mention "ORC" in the error message, e.g. 
"Encountered parse error in tail of ORC file /test-warehouse/table/foo.orc: "


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@233
PS5, Line 233:
We can remove most of the blank lines in this function.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@234
PS5, Line 234:   if (reader_->getNumberOfRows() == 0) {
Nit: can fit if() and return on one line.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@240
PS5, Line 240:     return Status(Substitute("Invalid file: $0. No stripes in 
this file but numberOfRows"
"Invalid ORC file"


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@250
PS5, Line 250:     case orc::CompressionKind::CompressionKind_NONE:return 
THdfsCompression::NONE;
Nit: missing space


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@259
PS5, Line 259:       VLOG_QUERY << "ORC files in ZSTD compression are counted 
as DEFAULT in profile";
We should return the error in a Status if the compression type isn't handled. 
We want to report the problem to the user in a helpful way.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@281
PS5, Line 281: nullptr
NULL, not nullptr, since this is just a null indicator bit.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@291
PS5, Line 291:     
col_id_slot_map_[reader_->getType().getSubtype(col_idx_in_file)->getColumnId()] 
= slot_desc;
nit: long line


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@292
PS5, Line 292:     const ColumnType &col_type = 
scan_node_->hdfs_table()->col_descs()[col_idx].type();
We should validate that the ORC and Impala columns types are compatible at this 
point, instead of trying to handle it later.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@306
PS5, Line 306: std::m
nit: std:: should not be necessary since this is imported by names.h


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@388
PS5, Line 388:
unnecessary blank line


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@489
PS5, Line 489:
             :           VLOG_QUERY << "Encounter parse error: " << e.what()
             :                      << " which is due to cancellation";
We don't need to log anything if we're cancelling normally. Why do we need to 
handle this case specially anyway? If we hit an error can't we just return that 
error? Impala already will ignore errors that happen after a user cancels the 
query.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@552
PS5, Line 552: VLOG_FILE
Logging once per batch in VLOG_FILE seems too verbose. Remove or move to 
VLOG_ROW


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@567
PS5, Line 567: // TODO: combine this with the Parquet implementation
Can we do this as part of this patch? I'd prefer to avoid adding duplicate code.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@590
PS5, Line 590: ColumnVectorBatch
I think the argument to this function should be a StructVectorBatch instead of 
ColumnVectorBatch, because we require that the caller pass in a 
StructVectorBatch.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@598
PS5, Line 598:     if (slot_desc == nullptr) {
Can this be a DCHECK? Isn't this a bug in our code if the slot doesn't exist?


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@645
PS5, Line 645:           case 
TYPE_FLOAT:*(reinterpret_cast<float*>(slot_val_ptr)) = val;
needs space after :


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@647
PS5, Line 647:           case 
TYPE_DOUBLE:*(reinterpret_cast<double*>(slot_val_ptr)) = val;
needs space after :


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@657
PS5, Line 657:       case orc::TypeKind::CHAR: {
I don't think this handles CHAR(N) or VARCHAR(N) types in Impala correctly.

VARCHAR isn't too difficult - you just need to truncate val_ptr->len to the max 
varchar len.

CHAR has a different memory layout - it is a fixed-size buffer at 'slot'.

I'm ok with deferring work on these, but we should make sure that Impala fails 
gracefully.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@670
PS5, Line 670:           return 
scan_node_->mem_tracker()->MemLimitExceeded(state_, details, val_ptr->len);
long line


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-orc-scanner.cc@693
PS5, Line 693:           // TODO warn if slot_desc->type().GetByteSize() != 16
I think we should validate the relationship between Impala and ORC types when 
reading the file metadata. If the Impala decimal type is too small, we can just 
return an error instead of truncating the data.


http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-scanner.h
File be/src/exec/hdfs-scanner.h:

http://gerrit.cloudera.org:8080/#/c/9134/5/be/src/exec/hdfs-scanner.h@326
PS5, Line 326:   /// Size of the file footer.  This is a guess.  If this value 
is too little, we will
Mention ORC and Parquet? E.g. "Size of the file footer for ORC and Parquet"


http://gerrit.cloudera.org:8080/#/c/9134/5/testdata/LineItemMultiBlock/lineitem_orc_multiblock_one_stripe.orc
File testdata/LineItemMultiBlock/lineitem_orc_multiblock_one_stripe.orc:

PS5:
These files are pretty big - they'll increase the size of the Impala repo quite 
a bit. Can we reduce the amount of data in them and still get the same test 
coverage?


http://gerrit.cloudera.org:8080/#/c/9134/5/testdata/bin/load_nested.py
File testdata/bin/load_nested.py:

http://gerrit.cloudera.org:8080/#/c/9134/5/testdata/bin/load_nested.py@297
PS5, Line 297: def load_orc():
I don't think this is the best way to load the data - I think we should add it 
elsewhere. I'm not sure what the best approach is so I asked Joe McDonnell to 
comment.



--
To view, visit http://gerrit.cloudera.org:8080/9134
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ia7b6ae4ce3b9ee8125b21993702faa87537790a4
Gerrit-Change-Number: 9134
Gerrit-PatchSet: 5
Gerrit-Owner: Quanlong Huang <huangquanl...@gmail.com>
Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <huangquanl...@gmail.com>
Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com>
Gerrit-Comment-Date: Wed, 14 Mar 2018 23:00:01 +0000
Gerrit-HasComments: Yes

Reply via email to