David Wilcox created ARROW-7383:
-----------------------------------

             Summary: Cannot Read In Java/Scala Streaming Arrow Files Generated 
In C++
                 Key: ARROW-7383
                 URL: https://issues.apache.org/jira/browse/ARROW-7383
             Project: Apache Arrow
          Issue Type: Bug
          Components: C++, Java
            Reporter: David Wilcox
         Attachments: cpparrow-stream.arrow, java-arrow-stream.arrow

I'm working on a project to shuttle data back and forth from a java and c++ 
process. I'm able to read the java stream object in C++. However, I'm unable to 
read the C++ stream object in my Java project. When I do, I get a problem that 
I can't deserialize the message.
{code:java}
Exception in thread "main" java.lang.IllegalArgumentExceptionException in 
thread "main" java.lang.IllegalArgumentException at 
java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at 
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
 at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.readNextBatch(ArrowConverters.scala:243)
 at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.<init>(ArrowConverters.scala:229)
 at 
org.apache.spark.sql.execution.arrow.ArrowConverters$.getBatchesFromStream(ArrowConverters.scala:228)
 at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:216)
 at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:214)
 at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543) at 
org.apache.spark.sql.execution.arrow.ArrowConverters$.readArrowStreamFromFile(ArrowConverters.scala:214)
 at 
org.apache.spark.sql.execution.arrow.MyArrowReader.rawRead(MyArrowReader.scala:23)
 at com.adobe.compute.PlatformGet$.main(PlatformGet.scala:121) at 
com.adobe.compute.PlatformGet.main(PlatformGet.scala) {code}
I checked out the code in MessageSerializer.java and it looks like it is 
expecting the read size to be created in the first four bytes of the file. 
Those four bytes are {{[-1, -1, -1, -1]}} in the C++ file, but in the Java file 
they are not. I do not know much about the internals of the Arrow streaming 
file format, so I can't say what it is supposed to be.

I'm able to read the streaming file created in Java back in java (so Java is 
compatible with itself). The C++ code is able to read either the Java streaming 
file or the C++ Streaming file. Both work.

Here's my C++ project to read and create a streaming object of its own.

 
{code:java}
arrow::Status Table__from_RecordBatchStreamReader(
        const std::shared_ptr<arrow::ipc::RecordBatchReader>& reader,
        std::shared_ptr<::arrow::Table>* table) {    
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
    shared_ptr<arrow::RecordBatch> cur;
    do {
        ARROW_RETURN_NOT_OK(reader->ReadNext(&cur));
        if (cur)
            batches.push_back(cur);
    } while (cur);    return arrow::Table::FromRecordBatches(batches, table);
}
#define EXIT_ON_FAILURE(expr)                      \
  do {                                             \
    arrow::Status status_ = (expr);                \
    if (!status_.ok()) {                           \
      std::cerr << status_.message() << std::endl; \
      return EXIT_FAILURE;                         \
    }                                              \
  } while (0);void checkStatus(arrow::Status st) {
    if (st.ok())
        return;
    std::cout << st.ToString() << std::endl;
}int main(int , char** argv) {
    (void)argv;
    std::shared_ptr<arrow::io::InputStream> infile
        = *arrow::io::MemoryMappedFile::Open("/tmp/java-arrow-stream.arrow", 
arrow::io::FileMode::READ);
    std::shared_ptr<arrow::ipc::RecordBatchReader> reader;
    checkStatus(
            arrow::ipc::RecordBatchStreamReader::Open(infile, &reader));
    std::shared_ptr<arrow::Table> table;
    checkStatus(Table__from_RecordBatchStreamReader(reader, &table));    auto 
schema = reader->schema();    shared_ptr<arrow::io::OutputStream> outputStream;
    outputStream = *arrow::io::FileOutputStream::Open(
            "/tmp/cpparrow-stream.arrow");
    shared_ptr<arrow::ipc::RecordBatchWriter> recordBatchStreamWriter;
    checkStatus(arrow::ipc::RecordBatchStreamWriter::Open(
                outputStream.get(), schema, &recordBatchStreamWriter));
    checkStatus(recordBatchStreamWriter->WriteTable(*table));
    return 0;
}
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to