Hi Wes,
My C++ code is attached. I tried to also read it from C++ by opening the disk
file as a MemoryMappedFile and get the same error when I make a
RecordBatchReader on the mmap'ed file, ie, "not an Arrow file".
There must be some magical sequence of writes needed to make the file kosher.
Thanks for helping.
Ken
p.s. I read your blog about relocating to Nashville. Was my stomping grounds
back in the 80s. Memories.
________________________________
From: Wes McKinney <[email protected]>
Sent: Sunday, January 24, 2021 11:41 AM
To: [email protected] <[email protected]>
Subject: Re: [python] not an arrow file
Can you show your C++ code?
On Sun, Jan 24, 2021 at 8:10 AM Teh, Kenneth M.
<[email protected]<mailto:[email protected]>> wrote:
Just started with arrow...
I wrote a record batch to a file using ipc::MakeFileWriter to create a writer
and writer->WriteRecordBatch in a C++ program and tried to read it in python
with:
[] import pyarrow as pa
[] reader = pa.ipc.open_file("myfile")
It raises the ArrowInvalid with the message "not an arrow file".
If I write it out as a Table in feather format, I can read it in python. But I
want to write large files on the order of 100GB or more and then read them back
into python as pandas dataframes or something similar.
So, I switched to using an ipc writer.
Can something point me in the right direction? Thanks.
Ken
#include <fmt/format.h>
#include <arrow/status.h>
#include <arrow/result.h>
#include <arrow/array.h>
#include <arrow/builder.h>
#include <arrow/record_batch.h>
#include <arrow/ipc/writer.h>
#include <arrow/io/file.h>
#include "ran3.h"
arrow::Result<std::shared_ptr<arrow::RecordBatch>> make_batch(
const std::shared_ptr<arrow::Schema> &schema,
const int64_t nrows)
{
using func_return_type = arrow::Result<std::shared_ptr<
arrow::RecordBatch>>;
arrow::Status st;
// All schema fields are conveniently int32_t's.
std::vector<int32_t> v(nrows, 0);
const int narrays = schema->num_fields();
std::vector<std::shared_ptr<arrow::Array>> columns(narrays);
for (int i = 0; i < narrays; ++i) {
arrow::Result<std::shared_ptr<arrow::Array>> res;
arrow::Int32Builder b;
for (int j = 0; j < nrows; ++j) v[j] = ran3(0);
b.Reset();
st = b.AppendValues(v);
if (not st.ok()) return func_return_type(st);
st = b.Finish(&columns[i]);
if (not st.ok()) return func_return_type(st);
}
auto rb = arrow::RecordBatch::Make(schema, nrows, columns);
return func_return_type(rb);
}
int main(int argc, char *argv[])
{
int nrecs = 1;
if (argc > 1) nrecs = std::stoi(argv[1]);
/* Make a schema for our record batches.
*/
std::shared_ptr<arrow::Field> adc1, adc2;
std::shared_ptr<arrow::Schema> schema;
adc1 = arrow::field("adc1", arrow::int32());
adc2 = arrow::field("adc2", arrow::int32());
schema = arrow::schema({adc1, adc2});
/* Make and IPC writer so we can write record batches.
*/
const std::string fnm = "writing.arrow";
auto fres = arrow::io::FileOutputStream::Open(fnm);
if (not fres.ok()) {
fmt::print("open file failed, <{}>, \"{}\"\n",
fres.status().code(), fres.status().message());
return 1;
}
auto fout = fres.ValueOrDie();
auto wres = arrow::ipc::MakeFileWriter(fout, schema);
if (not wres.ok()) {
fmt::print("make ipc writer failed, <{}>, \"{}\"\n",
wres.status().code(), wres.status().message());
return 1;
}
auto writer = wres.ValueOrDie();
const int64_t nrows = 500;
ran3(12345);
for (int i = 0; i < nrecs; ++i) {
arrow::Result<std::shared_ptr<arrow::RecordBatch>> bres;
std::shared_ptr<arrow::RecordBatch> rb;
arrow::Status st;
bres = make_batch(schema, nrows);
if (not bres.ok()) {
fmt::print("make_batch failed on i={}, <{}>, \"{}\"\n",
i, bres.status().code(), bres.status().message());
break;
}
rb = bres.ValueOrDie();
st = writer->WriteRecordBatch(*rb);
if (not st.ok()) {
fmt::print("write batch {} failed, <{}>, \"{}\"\n",
i, st.code(), st.message());
break;
}
}
static_cast<void>(fout->Close());
return 0;
}