Hey, I tested the suggestion here and by adapting code to read stream from csv format. But in my y tests the method OnRecordBatchDecoded is never called and my understanding is that this waits for an ipc format while I am reading csv format?
I am missing something? In the meantime in order to replay to this thread, I only need to replay to [email protected] ? https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60 on dev@ to connect the existing thread? lass MyListener : public arrow::ipc::Listener { public: arrow::Status OnRecordBatchDecoded(std::shared_ptr<arrow::RecordBatch> record_batch) override { ArrowFilter arrow_filter = ArrowFilter(record_batch); arrow_filter.ToCsv(); } } Thanks Le mer. 13 juil. 2022 à 06:50, Sutou Kouhei <[email protected]> a écrit : > Could you resend your reply to > https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60 > on dev@ to connect the existing thread? > > In <CAJdzkC2Rdz9wfM1_a3V4TqWF-U-3gs0TztHfdpkvKcxphdx=d...@mail.gmail.com> > "Re: StreamReader" on Tue, 12 Jul 2022 10:01:00 +0200, > L Ait <[email protected]> wrote: > > > Thank you, I will look on that, > > The real problem is that I read data in chunks and the end of the chunk > is > > truncated (not a complete line) . I need to wait for the next chunk to > have > > the line completion. > > > > Is there a way you suggest to process only the chunks smoothly ? > > > > Thank you > > > > > > Le ven. 8 juil. 2022 à 03:37, Sutou Kouhei <[email protected]> a écrit > : > > > >> Answered on dev@: > >> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60 > >> > >> In <CAJdzkC04+Uxa6bdmozPQFDkQ07M4Q=fmuhh2gvqzz-na2lm...@mail.gmail.com> > >> "StreamReader" on Sat, 2 Jul 2022 16:04:45 +0200, > >> L Ait <[email protected]> wrote: > >> > >> > Hi, > >> > > >> > I need help to integrate arrow cpp in my current project. In fact I > built > >> > cpp library and can call api. > >> > > >> > What I need is that: > >> > > >> > I have a c++ project that reads data by chunks then uses some erasure > >> code > >> > to rebuild original data. > >> > > >> > The rebuild is done in chunks , At each iteration I can access a > buffer > >> of > >> > rebuilt data. > >> > > >> > My need is to pass this data as a stream to arrow process then send > the > >> > processed stream. > >> > > >> > For example if my original file is a csv and I would like to filter > and > >> > save first column: > >> > > >> > file > >> > > >> > col1,col2, col3, col3 > >> > a1,b1,c1,d1 > >> > an,bn,cn,dn > >> > > >> > split to 6 chunks of equal sizes chunk1: > >> > > >> > a1,b1,c1,d1 > >> > ak,bk > >> > > >> > chunk2: > >> > > >> > ck,dk > >> > ... > >> > am,bm,cm,dm > >> > > >> > and so on. > >> > > >> > My question is how to use the right StreamReader in arrow and how > this > >> > deals with in complete records( lines) at the beginning and end of > each > >> > chunk ? > >> > > >> > Here a snippet of code I use : > >> > buffer_type_t res = fut.get0(); > >> > BOOST_LOG_TRIVIAL(trace) << > >> > "RawxBackendReader: Got result with buffer size: " << res.size(); > >> > std::shared_ptr<arrow::io::InputStream> input; > >> > > >> > std::shared_ptr<arrow::io::BufferReader> buffer(new > >> arrow::io::BufferReader( > >> > reinterpret_cast<const uint8_t*>(res.get()), res.size())); > >> > input = buffer; > >> > BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get(); > >> > > >> > ArrowFilter arrow_filter = ArrowFilter(input); > >> > arrow_filter.ToCsv(); > >> > > >> > > >> > result.push_back(std::move(res)); > >> > > >> > Thank you > >> >
