vicennial commented on code in PR #52496:
URL: https://github.com/apache/spark/pull/52496#discussion_r2460250048
##########
sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -151,55 +156,96 @@ private[sql] class SparkResult[T](
stop |= stopOnSchema
}
if (response.hasArrowBatch) {
- val ipcStreamBytes = response.getArrowBatch.getData
- val expectedNumRows = response.getArrowBatch.getRowCount
- val reader = new MessageIterator(ipcStreamBytes.newInput(), allocator)
- if (arrowSchema == null) {
- arrowSchema = reader.schema
- stop |= stopOnArrowSchema
- } else if (arrowSchema != reader.schema) {
- throw new IllegalStateException(
- s"""Schema Mismatch between expected and received schema:
- |=== Expected Schema ===
- |$arrowSchema
- |=== Received Schema ===
- |${reader.schema}
- |""".stripMargin)
- }
- if (structType == null) {
- // If the schema is not available yet, fallback to the arrow schema.
- structType = ArrowUtils.fromArrowSchema(reader.schema)
- }
- if (response.getArrowBatch.hasStartOffset) {
- val expectedStartOffset = response.getArrowBatch.getStartOffset
- if (numRecords != expectedStartOffset) {
+ val arrowBatch = response.getArrowBatch
+ logDebug(
+ s"Received arrow batch rows=${arrowBatch.getRowCount} " +
+ s"Number of chunks in batch=${arrowBatch.getNumChunksInBatch} " +
+ s"Chunk index=${arrowBatch.getChunkIndex} " +
+ s"size=${arrowBatch.getData.size()}")
+
+ if (arrowBatchChunksToAssemble.nonEmpty) {
+ // Expect next chunk of the same batch
+ if (arrowBatch.getChunkIndex != arrowBatchChunksToAssemble.size) {
throw new IllegalStateException(
- s"Expected arrow batch to start at row offset $numRecords in
results, " +
- s"but received arrow batch starting at offset
$expectedStartOffset.")
+ s"Expected chunk index ${arrowBatchChunksToAssemble.size} of the
" +
+ s"arrow batch but got ${arrowBatch.getChunkIndex}.")
}
Review Comment:
Since these are user facing exceptions, should we be using a structured
error state/code here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]