I changed my method to filter as follows, meet same core dump, it seems 
filter_expression_ lo​se efficacy,I use my func ParseExpression ​parse a 
string to Expression. then I use ReadFile to read a parquet file for 
ReacordBatch. The errors only occurs when I call ReadFile in the background 
thread. when I call ParseExpression in the background thread to create a new 
Expression, the error goes away。
Or if I don't use Background Threads, I won't make a mistake,the filter works 
and no core dump,It's very strange.




this is the way i call ReadFile in background thread:
background_worker_->Schedule(std::bind(&Iterator::ReadFile, this, 
current_file_idx_ + 1, true));
I should have captured filter_expression_, why doesn't it seem to work?



  




code:


    class Iterator : public ArrowBaseIterator<Dataset&gt; {
     public:
      explicit Iterator(const Params&amp; params)
          : ArrowBaseIterator<Dataset&gt;(params) {}

     private:
      Status SetupStreamsLocked(Env* env)
          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
        if (!s3fs_) {
          arrow::fs::EnsureS3Initialized();
          auto s3Options = arrow::fs::S3Options::FromAccessKey(
              dataset()-&gt;aws_access_key_, dataset()-&gt;aws_secret_key_);
          s3Options.endpoint_override = dataset()-&gt;aws_endpoint_override_;
          s3fs_ = arrow::fs::S3FileSystem::Make(s3Options).ValueOrDie();
          if (!dataset()-&gt;filter_.empty()) {
            std::cout << "before parse expression" << std::endl;
            
TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()-&gt;filter_, 
filter_expression_));
            std::cout << "after parse expression" << std::endl;
          }
        }
        TF_RETURN_IF_ERROR(ReadFile(current_file_idx_));
#if 0
        if (current_batch_idx_ < record_batches_.size()) {
          current_batch_ = record_batches_[current_batch_idx_];
        }
        else {
          current_batch_ = nullptr;
        }
#endif

#if 1
        if (!background_worker_) {
          background_worker_ =
              std::make_shared<BackgroundWorker&gt;(env, 
"download_next_worker");
        }

        if (current_batch_idx_ < record_batches_.size()) {
          current_batch_ = record_batches_[current_batch_idx_];
        }

        if (current_file_idx_ + 1 < dataset()-&gt;parquet_files_.size()) {
          background_worker_-&gt;Schedule(std::bind(&amp;Iterator::ReadFile, 
this,
                                                 current_file_idx_ + 1, true));
        }
#endif
        return Status::OK();
      }

      Status NextStreamLocked(Env* env)
          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
        ArrowBaseIterator<Dataset&gt;::NextStreamLocked(env);
        if (++current_batch_idx_ < record_batches_.size()) {
          current_batch_ = record_batches_[current_batch_idx_];
        } else if (++current_file_idx_ < dataset()-&gt;parquet_files_.size()) {
          current_batch_idx_ = 0;

#if 0
          record_batches_.clear();
          return SetupStreamsLocked(env);
#endif

#if 1
          {
            mutex_lock lk(cv_mu_);
            while (!background_thread_finished_) {
              cv_.wait(lk);
            }
          }

          record_batches_.swap(next_record_batches_);
          if (!record_batches_.empty()) {
            current_batch_ = record_batches_[current_batch_idx_];
          } else {
            current_batch_ = nullptr;
          }
          background_thread_finished_ = false;
          if (current_file_idx_ + 1 < dataset()-&gt;parquet_files_.size()) {
            background_worker_-&gt;Schedule(std::bind(
                &amp;Iterator::ReadFile, this, current_file_idx_ + 1, true));
          }
#endif
        }
        return Status::OK();
      }

      void ResetStreamsLocked() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
        ArrowBaseIterator<Dataset&gt;::ResetStreamsLocked();
        current_file_idx_ = 0;
        current_batch_idx_ = 0;
        record_batches_.clear();
        next_record_batches_.clear();
      }

      Status ReadFile(int file_index, bool background = false)
          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
        auto access_file =
            s3fs_-&gt;OpenInputFile(dataset()-&gt;parquet_files_[file_index])
                .ValueOrDie();

        parquet::ArrowReaderProperties properties;
        properties.set_use_threads(true);
        properties.set_pre_buffer(true);
        parquet::ReaderProperties parquet_properties =
            parquet::default_reader_properties();

        std::shared_ptr<parquet::arrow::FileReaderBuilder&gt; builder =
            std::make_shared<parquet::arrow::FileReaderBuilder&gt;();
        builder-&gt;Open(access_file, parquet_properties);

        std::unique_ptr<parquet::arrow::FileReader&gt; reader;
        builder-&gt;properties(properties)-&gt;Build(&amp;reader);

        if (column_indices_.empty()) {
          std::shared_ptr<arrow::Schema&gt; schema;
          reader-&gt;GetSchema(&amp;schema);
          // check column name exist
          std::string err_column_names;
          for (const auto&amp; name : dataset()-&gt;column_names_) {
            int fieldIndex = schema-&gt;GetFieldIndex(name);
            column_indices_.push_back(fieldIndex);
            if (-1 == fieldIndex) {
              err_column_names = err_column_names + " " + name;
            }
          }

          if (err_column_names.length() != 0) {
            return errors::InvalidArgument("these column names don't exist: ",
                                           err_column_names);
          }
        }
        // Read file columns and build a table
        std::shared_ptr<::arrow::Table&gt; table;
        CHECK_ARROW(reader-&gt;ReadTable(column_indices_, &amp;table));
        // Convert the table to a sequence of batches
        auto tr = std::make_shared<arrow::TableBatchReader&gt;(*table.get());

        using namespace arrow::compute;
        // ensure arrow::dataset node factories are in the registry
        // arrow::dataset::internal::Initialize();
        // execution context
        ExecContext exec_context;
        auto plan = ExecPlan::Make(&amp;exec_context).ValueOrDie();
        arrow::AsyncGenerator<arrow::util::optional<ExecBatch&gt;&gt; sink_gen;
        auto source_node_options = 
arrow::compute::SourceNodeOptions{table-&gt;schema(), 
                              [tr]() {
                              using ExecBatch = arrow::compute::ExecBatch;
                              using ExecBatchOptional = 
arrow::util::optional<ExecBatch&gt;;
                              auto arrow_record_batch_result = tr-&gt;Next();
                              if (!arrow_record_batch_result.ok()) {
                                std::cout << "end1" << std::endl;
                                return 
arrow::AsyncGeneratorEnd<ExecBatchOptional&gt;();
                              }
                              auto arrow_record_batch = 
std::move(*arrow_record_batch_result);
                              if (!arrow_record_batch) {
                                std::cout << "end2" << std::endl;
                                return 
arrow::AsyncGeneratorEnd<ExecBatchOptional&gt;();
                              }
                              std::cout << "num rows: " << 
arrow_record_batch-&gt;num_rows() << std::endl;
                              return 
arrow::Future<ExecBatchOptional&gt;::MakeFinished(
                                ExecBatch(*arrow_record_batch));
                            }
                          };
        auto source_node = MakeExecNode("source", plan.get(), {}, 
source_node_options).ValueOrDie();
        // std::cout << "filter:   "<< filter_expression_.ToString() << 
std::endl;
        // arrow::compute::Expression filter_expr;

        // TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()-&gt;filter_, 
filter_expr));

        auto filter_node = MakeExecNode("filter", plan.get(), {source_node}, 
FilterNodeOptions{this-&gt;filter_expression_}).ValueOrDie();

        MakeExecNode("sink", plan.get(), {filter_node}, 
SinkNodeOptions{&amp;sink_gen});

        std::shared_ptr<::arrow::Schema&gt; schema;
        schema = table-&gt;schema();;
        auto sink_reader =  MakeGeneratorReader(schema, std::move(sink_gen), 
exec_context.memory_pool());
          plan-&gt;Validate();
        std::cout << "ExecPlan created : " << plan-&gt;ToString() << std::endl;
        // // start the ExecPlan
        plan-&gt;StartProducing();

        std::shared_ptr<arrow::Table&gt; response_table;

        response_table = 
arrow::Table::FromRecordBatchReader(sink_reader.get()).ValueOrDie();

        std::cout << "Results : " << response_table-&gt;ToString() << std::endl;

        // // stop producing
        plan-&gt;StopProducing();
        // // plan mark finished
        auto future = plan-&gt;finished();

        auto ttr = 
std::make_shared<arrow::TableBatchReader&gt;(*response_table.get());

        // filter
        // auto scanner_builder = 
arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
        // scanner_builder-&gt;UseThreads(false);
        // if (!dataset()-&gt;filter_.empty()) {
        //   std::cout << filter_expression_.ToString() << std::endl;
        //   scanner_builder-&gt;Filter(filter_expression_);
        // }
        std::shared_ptr<arrow::RecordBatch&gt; batch = nullptr;
        // auto scanner = scanner_builder-&gt;Finish().ValueOrDie();
        // auto batch_reader = scanner-&gt;ToRecordBatchReader().ValueOrDie();
        CHECK_ARROW(ttr-&gt;ReadNext(&amp;batch));
        TF_RETURN_IF_ERROR(CheckBatchColumnTypes(batch));
        next_record_batches_.clear();
        while (batch != nullptr) {
          if (!background) {
            record_batches_.emplace_back(batch);
          } else {
            next_record_batches_.emplace_back(batch);
          }
          CHECK_ARROW(ttr-&gt;ReadNext(&amp;batch));
        }

        if (background) {
          mutex_lock lk(cv_mu_);
          background_thread_finished_ = true;
          cv_.notify_all();
        }

        return Status::OK();
      }

      size_t current_file_idx_ TF_GUARDED_BY(mu_) = 0;
      size_t current_batch_idx_ TF_GUARDED_BY(mu_) = 0;
      std::vector<std::shared_ptr<arrow::RecordBatch&gt;&gt; record_batches_
          TF_GUARDED_BY(mu_);
      std::vector<std::shared_ptr<arrow::RecordBatch&gt;&gt; 
next_record_batches_
          TF_GUARDED_BY(mu_);
      std::shared_ptr<arrow::fs::S3FileSystem&gt; s3fs_ TF_GUARDED_BY(mu_) =
          nullptr;
      std::vector<int&gt; column_indices_ TF_GUARDED_BY(mu_);
      std::shared_ptr<BackgroundWorker&gt; background_worker_ = nullptr;
      mutex cv_mu_;
      condition_variable cv_;
      bool background_thread_finished_ = false;
      arrow::compute::Expression filter_expression_;
    };





bt:


#0&nbsp; 0x00007ffff7e0c18b in raise () from /usr/lib/x86_64-linux-gnu/libc.so.6
#1&nbsp; 0x00007ffff7deb859 in abort () from /usr/lib/x86_64-linux-gnu/libc.so.6
#2&nbsp; 0x00007ff82929e2b0 in arrow::util::CerrLog::~CerrLog 
(this=0x7ffad40d40b0, __in_chrg=<optimized out&gt;) at 
external/arrow/cpp/src/arrow/util/logging.cc:72
#3&nbsp; 0x00007ff82929e2d0 in arrow::util::CerrLog::~CerrLog 
(this=0x7ffad40d40b0, __in_chrg=<optimized out&gt;) at 
external/arrow/cpp/src/arrow/util/logging.cc:74
#4&nbsp; 0x00007ff82929e0ef in arrow::util::ArrowLog::~ArrowLog 
(this=0x7ff804f83d60, __in_chrg=<optimized out&gt;) at 
external/arrow/cpp/src/arrow/util/logging.cc:250
#5&nbsp; 0x00007ff828f949cb in arrow::compute::CallNotNull (expr=...) at 
external/arrow/cpp/src/arrow/compute/exec/expression_internal.h:40
#6&nbsp; 0x00007ff828f8f3e0 in arrow::compute::(anonymous 
namespace)::BindImpl<arrow::Schema&gt; (expr=..., in=..., 
shape=arrow::ValueDescr::ARRAY, exec_context=0x7ff804f84420) at 
external/arrow/cpp/src/arrow/compute/exec/expression.cc:458
#7&nbsp; 0x00007ff828f8f4bb in arrow::compute::(anonymous 
namespace)::BindImpl<arrow::Schema&gt; (expr=..., in=..., 
shape=arrow::ValueDescr::ARRAY, exec_context=0x7ff804f84420) at 
external/arrow/cpp/src/arrow/compute/exec/expression.cc:460
#8&nbsp; 0x00007ff828f8efe7 in arrow::compute::(anonymous 
namespace)::BindImpl<arrow::Schema&gt; (expr=..., in=..., 
shape=arrow::ValueDescr::ARRAY, exec_context=0x0) at 
external/arrow/cpp/src/arrow/compute/exec/expression.cc:441
#9&nbsp; 0x00007ff828f820bd in arrow::compute::Expression::Bind 
(this=0x7ff804f846a0, in_schema=..., exec_context=0x0) at 
external/arrow/cpp/src/arrow/compute/exec/expression.cc:476
#10 0x00007ff828fc3693 in arrow::compute::(anonymous 
namespace)::FilterNode::Make (plan=0x7ffad40d63f0, inputs=std::vector of length 
1, capacity 1 = {...}, options=...) at 
external/arrow/cpp/src/arrow/compute/exec/filter_node.cc:53
#11 0x00007ff828fc5b06 in 
std::_Function_handler<arrow::Result<arrow::compute::ExecNode*&gt; 
(arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, 
std::allocator<arrow::compute::ExecNode*&gt; &gt;, 
arrow::compute::ExecNodeOptions const&amp;), 
arrow::Result<arrow::compute::ExecNode*&gt; (*)(arrow::compute::ExecPlan*, 
std::vector<arrow::compute::ExecNode*, 
std::allocator<arrow::compute::ExecNode*&gt; &gt;, 
arrow::compute::ExecNodeOptions const&amp;)&gt;::_M_invoke(std::_Any_data 
const&amp;, arrow::compute::ExecPlan*&amp;&amp;, 
std::vector<arrow::compute::ExecNode*, 
std::allocator<arrow::compute::ExecNode*&gt; &gt;&amp;&amp;, 
arrow::compute::ExecNodeOptions const&amp;) (__functor=...,&nbsp;
&nbsp; &nbsp; __args#0=@0x7ff804f847c8: 0x7ffad40d63f0, __args#1=..., 
__args#2=...) at /usr/include/c++/9/bits/std_function.h:286
#12 0x00007ff828be00d7 in 
std::function<arrow::Result<arrow::compute::ExecNode*&gt; 
(arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, 
std::allocator<arrow::compute::ExecNode*&gt; &gt;, 
arrow::compute::ExecNodeOptions 
const&amp;)&gt;::operator()(arrow::compute::ExecPlan*, 
std::vector<arrow::compute::ExecNode*, 
std::allocator<arrow::compute::ExecNode*&gt; &gt;, 
arrow::compute::ExecNodeOptions const&amp;) const (this=0x7ff804f848a0, 
__args#0=0x7ffad40d63f0, __args#1=std::vector of length 0, capacity 0, 
__args#2=...) at /usr/include/c++/9/bits/std_function.h:688
#13 0x00007ff828bd251c in arrow::compute::MakeExecNode (Python Exception <class 
'gdb.error'&gt; No type named class std::basic_string<char, 
std::char_traits<char&gt;, std::allocator<char&gt; &gt;::_Rep.:&nbsp;
factory_name=, plan=0x7ffad40d63f0, inputs=std::vector of length 0, capacity 0, 
options=..., registry=0x7ff82d1ac960 
<arrow::compute::default_exec_factory_registry()::instance&gt;) at 
external/arrow/cpp/src/arrow/compute/exec/exec_plan.h:360
#14 0x00007ff828bde6e2 in 
tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile 
(this=0x18471300, file_index=1, background=true) at 
tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc:1248
#15 0x00007ff828bedcbc in std::__invoke_impl<tensorflow::Status, 
tensorflow::Status 
(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&amp;)(int, bool), 
tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&amp;, unsigned 
long&amp;, bool&amp;&gt; (__f=
&nbsp; &nbsp; @0x19161d80: (class tensorflow::Status 
(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class 
tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 
0x7ff828bdddfa 
<tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, 
bool)&gt;, __t=@0x19161da0: 0x18471300)
&nbsp; &nbsp; at /usr/include/c++/9/bits/invoke.h:73
#16 0x00007ff828bec6b2 in std::__invoke<tensorflow::Status 
(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&amp;)(int, bool), 
tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&amp;, unsigned 
long&amp;, bool&amp;&gt; (__fn=
&nbsp; &nbsp; @0x19161d80: (class tensorflow::Status 
(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class 
tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 
0x7ff828bdddfa 
<tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, 
bool)&gt;) at /usr/include/c++/9/bits/invoke.h:96
#17 0x00007ff828beadb2 in std::_Bind<tensorflow::Status 
(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*,
 unsigned long, bool))(int, bool)&gt;::__call<tensorflow::Status, , 0ul, 1ul, 
2ul&gt;(std::tuple<&gt;&amp;&amp;, std::_Index_tuple<0ul, 1ul, 2ul&gt;) 
(this=0x19161d80,&nbsp;
&nbsp; &nbsp; __args=...) at /usr/include/c++/9/functional:402
#18 0x00007ff828be916c in std::_Bind<tensorflow::Status 
(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*,
 unsigned long, bool))(int, bool)&gt;::operator()<, tensorflow::Status&gt;() 
(this=0x19161d80) at /usr/include/c++/9/functional:484
#19 0x00007ff828be6633 in std::_Function_handler<void (), 
std::_Bind<tensorflow::Status 
(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*,
 unsigned long, bool))(int, bool)&gt; &gt;::_M_invoke(std::_Any_data 
const&amp;) (__functor=...)
&nbsp; &nbsp; at /usr/include/c++/9/bits/std_function.h:300
#20 0x00007ffd92bd68ea in tensorflow::data::BackgroundWorker::WorkerLoop() () 
from 
/usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#21 0x00007ffd9338f5d8 in tensorflow::(anonymous 
namespace)::PThread::ThreadFn(void*) () from 
/usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#22 0x00007ffff7dac609 in start_thread (arg=<optimized out&gt;) at 
pthread_create.c:477
#23 0x00007ffff7ee8293 in clone () from /usr/lib/x86_64-linux-gnu/libc.so.6









1057445597
[email protected]



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                            
                                            "user"                              
                                                      
<[email protected]&gt;;
发送时间:&nbsp;2022年7月27日(星期三) 凌晨0:57
收件人:&nbsp;"user"<[email protected]&gt;;

主题:&nbsp;Re: how to filter a table to select rows



Hi 1057445597,Could you provide more information about your core dump? What 
backtrace does it give? I notice you’re not checking the Status returned by 
scanner_builder-&gt;Filter. That could be a place to start.


Sasha Krassovsky


On Jul 26, 2022, at 8:27 AM, Aldrin <[email protected]&gt; wrote:

You can create an InMemoryDataset from a RecordBatch. See [1] for docs and [2] 
for example code. You may be able to find something similar for filtering 
tables.



[1]: 
https://arrow.apache.org/docs/cpp/api/dataset.html#_CPPv4N5arrow7dataset15InMemoryDataset15InMemoryDatasetENSt10shared_ptrI6SchemaEE17RecordBatchVector
[2]: 
https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/mainline/src/cpp/processing/operators.cpp#L50


Aldrin Montana
Computer Science PhD Student
UC Santa Cruz














On Mon, Jul 25, 2022 at 8:49 PM 1057445597 <[email protected]&gt; wrote:

I use the follows code to filter table, but always core dump at 
scanner_builder-&gt;Filter(filter_expression_). Is there a better way to filter 
a table? or a Recordbatch?


by the way dataset::ScannerBuilder always core dump when I used it in tfio to 
create a tensorflow dataset, It's most likely buggy




        // Read file columns and build a table
        std::shared_ptr<::arrow::Table&gt; table;
        CHECK_ARROW(reader-&gt;ReadTable(column_indices_, &amp;table));
        // Convert the table to a sequence of batches
        auto tr = std::make_shared<arrow::TableBatchReader&gt;(*table.get());

        // filter
        auto scanner_builder = 
arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
        if (!dataset()-&gt;filter_.empty()) {
          std::cout << filter_expression_.ToString() << std::endl;
          scanner_builder-&gt;Filter(filter_expression_);
        }






1057445597
[email protected]



&nbsp;

Reply via email to