Re: how to make acero output order by batch index

2023-07-26 Thread Weston Pace
> Replacing ... with ... works as expected This is, I think, because the RecordBatchSourceNode defaults to implicit ordering (note the RecordBatchSourceNode is a SchemaSourceNode): ``` struct SchemaSourceNode : public SourceNode { SchemaSourceNode(ExecPlan* plan, std::shared_ptr schema,

Re: how to make acero output order by batch index

2023-07-26 Thread Weston Pace
> I think the key problem is that the input stream is unordered. The > input stream is a ArrowArrayStream imported from python side, and then > declared to a "record_batch_reader_source", which is a unordered > source node. So the behavior is expected. > I think the

Re: how to make acero output order by batch index

2023-07-25 Thread Wenbo Hu
Replacing ``` ac::Declaration source{"record_batch_reader_source", ac::RecordBatchReaderSourceNodeOptions{std::move(input)}}; ``` with ``` ac::RecordBatchSourceNodeOptions rb_source_options{ input->schema(), [input]() { return arrow::MakeFunctionIterator([input] { return input->Next(); }); }};

Re: how to make acero output order by batch index

2023-07-25 Thread Wenbo Hu
Hi, I'll open a issue on the DeclareToReader problem. I think the key problem is that the input stream is unordered. The input stream is a ArrowArrayStream imported from python side, and then declared to a "record_batch_reader_source", which is a unordered source node. So the behavior is

Re: how to make acero output order by batch index

2023-07-25 Thread Weston Pace
> Reading the source code of exec_plan.cc, DeclarationToReader called > DeclarationToRecordBatchGenerator, which ignores the sequence_output > parameter in SinkNodeOptions, also, it calls validate which should > fail if the SinkNodeOptions honors the sequence_output. Then it seems > that

Re: how to make acero output order by batch index

2023-07-25 Thread Wenbo Hu
Reading the source code of exec_plan.cc, DeclarationToReader called DeclarationToRecordBatchGenerator, which ignores the sequence_output parameter in SinkNodeOptions, also, it calls validate which should fail if the SinkNodeOptions honors the sequence_output. Then it seems that DeclarationToReader

how to make acero output order by batch index

2023-07-25 Thread Wenbo Hu
Hi, I'm trying to zip two streams with same order but different processes. For example, the original stream comes with two column 'id' and 'age', and splits into two stream processed distributedly using acero: 1. hash the 'id' into a stream with single column 'bucket_id' and 2. classify