Hi,
I can't understand why you want to mix dev@ and user@
mailing lists... Anyway...
Sorry. I misunderstood. I thought that your input is Apache
Arrow format and your output is CSV. You can't use
arrow::ipc::RecordBatchStreamReader for CSV. You need to use
arrow::csv::StreamReader:
buffer_type_t res = fut.get0();
BOOST_LOG_TRIVIAL(trace) <<
"RawxBackendReader: Got result with buffer size: " << res.size();
auto input = std::make_shared<arrow::io::BufferReader>(
reinterpret_cast<const uint8_t*>(res.get()),
res.size());
BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();
auto io_context = arrow::io::IOContext(arrow::default_memory_pool());
auto read_options = arrow::csv::ReadOptions::Defaults();
auto parse_options = arrow::csv::ParseOptions::Defaults();
auto convert_options = arrow::csv::ConvertOptions::Defaults();
auto reader_result =
arrow::csv::StreamReader::Make(io_context,
input,
read_options,
parse_options,
convert_options);
if (reader_result.ok()) {
exit(1);
}
auto reader = *reader_result;
for (auto record_batch_result : *reader) {
if (!record_batch_result.ok()) {
exit(1);
}
auto *record_batch = record_batch_result;
// Filter record_batch and write CSV.
// You can use arrow::csv::MakeCSVWriter() to write a CSV.
}
result.push_back(std::move(res));
Thanks,
--
kou
In <cajdzkc3rykqtvjpdmea7f6fz5mi4wvf1ukk0q0zgffn_xaz...@mail.gmail.com>
"Re: StreamReader" on Mon, 18 Jul 2022 10:31:08 +0200,
L Ait <[email protected]> wrote:
> Hey,
>
> I tested the suggestion here and by adapting code to read stream from csv
> format.
> But in my y tests the method OnRecordBatchDecoded is never called and my
> understanding is that this waits for an ipc format
> while I am reading csv format?
>
> I am missing something?
>
> In the meantime in order to replay to this thread, I only need to replay to
> [email protected] ?
> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
> on dev@ to connect the existing thread?
>
> lass MyListener : public arrow::ipc::Listener {
> public:
> arrow::Status
> OnRecordBatchDecoded(std::shared_ptr<arrow::RecordBatch> record_batch)
> override {
> ArrowFilter arrow_filter = ArrowFilter(record_batch);
> arrow_filter.ToCsv();
> }
> }
>
>
> Thanks
>
>
> Le mer. 13 juil. 2022 à 06:50, Sutou Kouhei <[email protected]> a écrit :
>
>> Could you resend your reply to
>> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
>> on dev@ to connect the existing thread?
>>
>> In <CAJdzkC2Rdz9wfM1_a3V4TqWF-U-3gs0TztHfdpkvKcxphdx=d...@mail.gmail.com>
>> "Re: StreamReader" on Tue, 12 Jul 2022 10:01:00 +0200,
>> L Ait <[email protected]> wrote:
>>
>> > Thank you, I will look on that,
>> > The real problem is that I read data in chunks and the end of the chunk
>> is
>> > truncated (not a complete line) . I need to wait for the next chunk to
>> have
>> > the line completion.
>> >
>> > Is there a way you suggest to process only the chunks smoothly ?
>> >
>> > Thank you
>> >
>> >
>> > Le ven. 8 juil. 2022 à 03:37, Sutou Kouhei <[email protected]> a écrit
>> :
>> >
>> >> Answered on dev@:
>> >> https://lists.apache.org/thread/5rpykkfoz416mq889pcpx9rwrrtjog60
>> >>
>> >> In <CAJdzkC04+Uxa6bdmozPQFDkQ07M4Q=fmuhh2gvqzz-na2lm...@mail.gmail.com>
>> >> "StreamReader" on Sat, 2 Jul 2022 16:04:45 +0200,
>> >> L Ait <[email protected]> wrote:
>> >>
>> >> > Hi,
>> >> >
>> >> > I need help to integrate arrow cpp in my current project. In fact I
>> built
>> >> > cpp library and can call api.
>> >> >
>> >> > What I need is that:
>> >> >
>> >> > I have a c++ project that reads data by chunks then uses some erasure
>> >> code
>> >> > to rebuild original data.
>> >> >
>> >> > The rebuild is done in chunks , At each iteration I can access a
>> buffer
>> >> of
>> >> > rebuilt data.
>> >> >
>> >> > My need is to pass this data as a stream to arrow process then send
>> the
>> >> > processed stream.
>> >> >
>> >> > For example if my original file is a csv and I would like to filter
>> and
>> >> > save first column:
>> >> >
>> >> > file
>> >> >
>> >> > col1,col2, col3, col3
>> >> > a1,b1,c1,d1
>> >> > an,bn,cn,dn
>> >> >
>> >> > split to 6 chunks of equal sizes chunk1:
>> >> >
>> >> > a1,b1,c1,d1
>> >> > ak,bk
>> >> >
>> >> > chunk2:
>> >> >
>> >> > ck,dk
>> >> > ...
>> >> > am,bm,cm,dm
>> >> >
>> >> > and so on.
>> >> >
>> >> > My question is how to use the right StreamReader in arrow and how
>> this
>> >> > deals with in complete records( lines) at the beginning and end of
>> each
>> >> > chunk ?
>> >> >
>> >> > Here a snippet of code I use :
>> >> > buffer_type_t res = fut.get0();
>> >> > BOOST_LOG_TRIVIAL(trace) <<
>> >> > "RawxBackendReader: Got result with buffer size: " << res.size();
>> >> > std::shared_ptr<arrow::io::InputStream> input;
>> >> >
>> >> > std::shared_ptr<arrow::io::BufferReader> buffer(new
>> >> arrow::io::BufferReader(
>> >> > reinterpret_cast<const uint8_t*>(res.get()), res.size()));
>> >> > input = buffer;
>> >> > BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();
>> >> >
>> >> > ArrowFilter arrow_filter = ArrowFilter(input);
>> >> > arrow_filter.ToCsv();
>> >> >
>> >> >
>> >> > result.push_back(std::move(res));
>> >> >
>> >> > Thank you
>> >>
>>