uros-db commented on code in PR #53227: URL: https://github.com/apache/spark/pull/53227#discussion_r2637875048
########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/GeometryModel.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +/** + * Abstract base class for specific geometry types (Point, LineString, Polygon, etc.). + * This class provides common functionality needed by geometry subclasses. + */ +public abstract class GeometryModel { + + /** GeometryModel internal implementation. */ + + // Geometry type and SRID information Review Comment: Can we add some additional information for developers in this comment? e.g. I would like to explain a bit better how geo type ID here relates with OGC WKB base codes (1-7) ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/GeometryCollection.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +import java.util.List; + +/** + * Represents a GeometryCollection geometry. + */ +class GeometryCollection extends GeometryModel { + private final List<GeometryModel> geometries; + private final boolean hasZ; + private final boolean hasM; + + GeometryCollection(List<GeometryModel> geometries, int srid, boolean hasZ, boolean hasM) { + super(GeoTypeId.GEOMETRY_COLLECTION, srid); + this.geometries = geometries; + this.hasZ = hasZ; + this.hasM = hasM; + } + + List<GeometryModel> getGeometries() { + return geometries; + } + + int getNumGeometries() { + return geometries.size(); + } + + @Override + boolean isEmpty() { + return geometries.isEmpty() || geometries.stream().allMatch(GeometryModel::isEmpty); + } + + @Override + int getDimensionCount() { + return 2 + (hasZ ? 1 : 0) + (hasM ? 1 : 0); + } + Review Comment: This is repeated 7 times across the classes. Can we move it to `GeometryModel`? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/Point.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +import java.util.Arrays; + +/** + * Represents a point geometry with coordinates. + */ +class Point extends GeometryModel { + private final double[] coordinates; + private final boolean is_Empty; Review Comment: This goes here, and below. ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java: ########## @@ -77,6 +80,9 @@ public Geometry copy() { // Returns a Geometry object with the specified SRID value by parsing the input WKB. public static Geometry fromWkb(byte[] wkb, int srid) { + WkbReader reader = new WkbReader(); + reader.read(wkb); // Validate WKB Review Comment: Nice, @szehon-ho this is exactly the separation of concerns that should exist between `Geometry` and `WkbReader`. ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbWriter.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Utility class for converting geometries to Well-Known Binary (WKB) format. + * This class implements the OGC Simple Features specification for WKB writing. + * <p> + * This class is designed to support reuse of a single instance to write multiple + * geometries efficiently. This class is NOT thread-safe; each thread should create + * its own instance. + */ +public class WkbWriter { + + /** + * Gets the WKB type code for a geometry, including dimension offset. + * For example: Point 2D = 1, Point Z = 1001, Point M = 2001, Point ZM = 3001 + */ + private static int getWkbType(GeometryModel geometry) { + int baseType = (int) geometry.getTypeId().getValue(); + boolean hasZ = geometry.hasZ(); + boolean hasM = geometry.hasM(); + + // Determine dimension offset based on hasZ and hasM flags + if (hasZ && hasM) { + return baseType + WkbConstants.DIM_OFFSET_ZM; + } else if (hasZ) { + return baseType + WkbConstants.DIM_OFFSET_Z; + } else if (hasM) { + return baseType + WkbConstants.DIM_OFFSET_M; + } else { + return baseType + WkbConstants.DIM_OFFSET_2D; + } + } + + // ========== Public Write Methods ========== + + /** + * Writes a geometry to WKB format. + */ + public byte[] write(GeometryModel geometry) { + return write(geometry, ByteOrder.LITTLE_ENDIAN); + } + + /** + * Writes a geometry to WKB format with specified byte order. + * <p> Review Comment: What is the `<p>` tag intended for? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/GeometryModel.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +/** + * Abstract base class for specific geometry types (Point, LineString, Polygon, etc.). + * This class provides common functionality needed by geometry subclasses. + */ +public abstract class GeometryModel { + + /** GeometryModel internal implementation. */ + + // Geometry type and SRID information + protected GeoTypeId typeId; + protected int sridValue; + + /** GeometryModel constants. */ + + // The default SRID value for GEOMETRY values. + static int DEFAULT_SRID = 0; Review Comment: Do we need this at all? Can we always force callers to pass their SRID instead? I think this would make more sense, given that the geometry model abstraction is logically separate from other parts of the system. ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.spark.sql.catalyst.util.geo.WkbConstants.DEFAULT_SRID; + +/** + * Reader for parsing Well-Known Binary (WKB) format geometries. + * This class implements the OGC Simple Features specification for WKB parsing. + */ +public class WkbReader { + private ByteBuffer buffer; + private final int validationLevel; + private byte[] currentWkb; + + public WkbReader() { + this(1); + } + + public WkbReader(int validationLevel) { + this.validationLevel = validationLevel; + } + + // ========== Coordinate Validation Helpers ========== + + /** + * Returns true if the coordinate value is valid for a non-empty point. + * A valid coordinate is finite (not NaN and not Infinity). + */ + private static boolean isValidCoordinate(double value) { + return Double.isFinite(value); + } + + /** + * Returns true if the coordinate value is valid for a point that may be empty. + * A valid coordinate is either finite or NaN (for empty points). + * Infinity values are not allowed. + */ + private static boolean isValidCoordinateAllowEmpty(double value) { + return Double.isFinite(value) || Double.isNaN(value); + } + + /** + * Reads a geometry from WKB bytes. + */ + public GeometryModel read(byte[] wkb) { + try { + currentWkb = wkb; + return readGeometry(DEFAULT_SRID); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + */ + public GeometryModel read(byte[] wkb, int srid) { + try { + currentWkb = wkb; + return readGeometry(srid); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + private void checkNotAtEnd(long pos) { + if (buffer.position() >= buffer.limit()) { + throw new WkbParseException("Unexpected end of WKB buffer", pos, currentWkb); + } + } + + private ByteOrder readEndianness() { + checkNotAtEnd(buffer.position()); + byte endianValue = buffer.get(); + if (endianValue != WkbConstants.BIG_ENDIAN && endianValue != WkbConstants.LITTLE_ENDIAN) { + throw new WkbParseException("Invalid byte order " + endianValue, buffer.position() - 1, + currentWkb); + } + return endianValue == WkbConstants.LITTLE_ENDIAN ? + ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + } + + private int readInt() { + if (buffer.remaining() < WkbConstants.INT_SIZE) { + checkNotAtEnd(buffer.limit()); + } + return buffer.getInt(); + } + + /** + * Reads a double coordinate value, allowing NaN for empty points. + */ + private double readDoubleAllowEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinateAllowEmpty(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a double coordinate value, not allowing NaN (for non-point coordinates like rings). + */ + private double readDoubleNoEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinate(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + * + * @param defaultSrid srid to use if not specified in WKB + * @return Geometry object + */ + private GeometryModel readGeometry(int defaultSrid) { + // Check that we have at least one byte for endianness + if (currentWkb == null || currentWkb.length < 1) { + throw new WkbParseException("WKB data is empty or null", 0, currentWkb); + } + + // Read endianness directly from the first byte + byte endianValue = currentWkb[0]; + if (endianValue > 1) { + throw new WkbParseException("Invalid byte order " + endianValue, 0, currentWkb); + } + ByteOrder byteOrder = endianValue == 1 ? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + + // Check that we have enough bytes for the rest of the data + if (currentWkb.length < 5) { + throw new WkbParseException("WKB data too short", 0, currentWkb); + } + + // Create a new buffer wrapping the rest of the byte array (after the endianness byte) + buffer = ByteBuffer.wrap(currentWkb, 1, currentWkb.length - 1); + buffer.order(byteOrder); + + return readGeometryInternal(defaultSrid, true); + } + + /** + * Reads a nested geometry and validates that its dimensions match the parent's dimensions. + * The check happens immediately after reading the type, before reading coordinate data. + */ + private GeometryModel readNestedGeometryWithDimensionCheck( + int defaultSrid, + boolean expectedHasZ, + boolean expectedHasM) { + return readNestedGeometryInternal(defaultSrid, true, expectedHasZ, expectedHasM); + } + + /** + * Internal method to read a nested geometry with optional dimension validation. + */ + private GeometryModel readNestedGeometryInternal( + int defaultSrid, + boolean validateDimensions, + boolean expectedHasZ, + boolean expectedHasM) { + // Read endianness from the current buffer position + ByteOrder byteOrder = readEndianness(); + + // Save the current byte order and temporarily set to the nested geometry's byte order + ByteOrder savedByteOrder = buffer.order(); + buffer.order(byteOrder); + long typeStartPos = buffer.position(); + + // Read type and dimension + int typeAndDim = readInt(); + + // Parse WKB format (geotype and dimension) + if (!WkbConstants.isValidWkbType(typeAndDim)) { + throw new WkbParseException("Invalid or unsupported type " + typeAndDim, typeStartPos, + currentWkb); + } + + int baseType = WkbConstants.getBaseType(typeAndDim); + GeoTypeId geoType = GeoTypeId.fromValue(baseType); + int dimensionCount = WkbConstants.getDimensionCount(typeAndDim); + boolean hasZ = WkbConstants.hasZ(typeAndDim); + boolean hasM = WkbConstants.hasM(typeAndDim); + + // Validate dimensions match parent if required + if (validateDimensions && (hasZ != expectedHasZ || hasM != expectedHasM)) { + throw new WkbParseException( + "Invalid or unsupported type " + typeAndDim, typeStartPos, currentWkb); + } + + // Dispatch to appropriate reader based on geometry type + GeometryModel result = readGeometryData(geoType, defaultSrid, dimensionCount, hasZ, hasM, + typeStartPos); + + // Restore the saved byte order + buffer.order(savedByteOrder); + + return result; + } + + /** + * Internal method to read geometry with optional endianness reading. + * + * @param defaultSrid srid to use if not specified in WKB + * @param isRootGeometry if true, assumes endianness has already been read and buffer is set up; + * if false, reads endianness from current buffer position + * @return Geometry object + */ + private GeometryModel readGeometryInternal(int defaultSrid, boolean isRootGeometry) { Review Comment: Do we ever call this with isRootGeometry=false? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java: ########## @@ -120,17 +126,20 @@ public static Geometry fromEwkt(byte[] ewkt) { public byte[] toWkb() { // This method returns only the WKB portion of the in-memory Geometry representation. // Note that the header is skipped, and that the WKB is returned as-is (little-endian). - return Arrays.copyOfRange(getBytes(), WKB_OFFSET, getBytes().length); + return toWkbInternal(DEFAULT_ENDIANNESS); } @Override public byte[] toWkb(ByteOrder endianness) { - // The default endianness is Little Endian (NDR). - if (endianness == DEFAULT_ENDIANNESS) { - return toWkb(); - } else { - throw new UnsupportedOperationException("Geometry WKB endianness is not yet supported."); - } + return toWkbInternal(endianness); + } + + private byte[] toWkbInternal(ByteOrder endianness) { Review Comment: Here, we do validation for any given endianness. I think this is fine for now, and that we should actually proceed like this. However, we can just keep in mind that an optimization may be possible here if we decide to skip validation on write (and just assume that it happened on read). Then, we could potentially "short-circuit" and make the write a bit faster. ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbWriter.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Utility class for converting geometries to Well-Known Binary (WKB) format. + * This class implements the OGC Simple Features specification for WKB writing. + * <p> Review Comment: What is the `<p>` tag intended for? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.spark.sql.catalyst.util.geo.WkbConstants.DEFAULT_SRID; + +/** + * Reader for parsing Well-Known Binary (WKB) format geometries. + * This class implements the OGC Simple Features specification for WKB parsing. + */ +public class WkbReader { + private ByteBuffer buffer; + private final int validationLevel; + private byte[] currentWkb; + + public WkbReader() { + this(1); + } + + public WkbReader(int validationLevel) { + this.validationLevel = validationLevel; + } + + // ========== Coordinate Validation Helpers ========== + + /** + * Returns true if the coordinate value is valid for a non-empty point. + * A valid coordinate is finite (not NaN and not Infinity). + */ + private static boolean isValidCoordinate(double value) { + return Double.isFinite(value); + } + + /** + * Returns true if the coordinate value is valid for a point that may be empty. + * A valid coordinate is either finite or NaN (for empty points). + * Infinity values are not allowed. + */ + private static boolean isValidCoordinateAllowEmpty(double value) { + return Double.isFinite(value) || Double.isNaN(value); + } + + /** + * Reads a geometry from WKB bytes. + */ + public GeometryModel read(byte[] wkb) { + try { + currentWkb = wkb; + return readGeometry(DEFAULT_SRID); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + */ + public GeometryModel read(byte[] wkb, int srid) { + try { + currentWkb = wkb; + return readGeometry(srid); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + private void checkNotAtEnd(long pos) { + if (buffer.position() >= buffer.limit()) { + throw new WkbParseException("Unexpected end of WKB buffer", pos, currentWkb); + } + } + + private ByteOrder readEndianness() { + checkNotAtEnd(buffer.position()); + byte endianValue = buffer.get(); + if (endianValue != WkbConstants.BIG_ENDIAN && endianValue != WkbConstants.LITTLE_ENDIAN) { + throw new WkbParseException("Invalid byte order " + endianValue, buffer.position() - 1, + currentWkb); + } + return endianValue == WkbConstants.LITTLE_ENDIAN ? + ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + } + + private int readInt() { + if (buffer.remaining() < WkbConstants.INT_SIZE) { + checkNotAtEnd(buffer.limit()); + } + return buffer.getInt(); + } + + /** + * Reads a double coordinate value, allowing NaN for empty points. + */ + private double readDoubleAllowEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinateAllowEmpty(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a double coordinate value, not allowing NaN (for non-point coordinates like rings). + */ + private double readDoubleNoEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinate(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + * + * @param defaultSrid srid to use if not specified in WKB + * @return Geometry object + */ + private GeometryModel readGeometry(int defaultSrid) { + // Check that we have at least one byte for endianness + if (currentWkb == null || currentWkb.length < 1) { + throw new WkbParseException("WKB data is empty or null", 0, currentWkb); + } + + // Read endianness directly from the first byte + byte endianValue = currentWkb[0]; + if (endianValue > 1) { + throw new WkbParseException("Invalid byte order " + endianValue, 0, currentWkb); + } + ByteOrder byteOrder = endianValue == 1 ? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + + // Check that we have enough bytes for the rest of the data + if (currentWkb.length < 5) { + throw new WkbParseException("WKB data too short", 0, currentWkb); + } + + // Create a new buffer wrapping the rest of the byte array (after the endianness byte) + buffer = ByteBuffer.wrap(currentWkb, 1, currentWkb.length - 1); + buffer.order(byteOrder); + + return readGeometryInternal(defaultSrid, true); + } + + /** + * Reads a nested geometry and validates that its dimensions match the parent's dimensions. + * The check happens immediately after reading the type, before reading coordinate data. + */ + private GeometryModel readNestedGeometryWithDimensionCheck( + int defaultSrid, + boolean expectedHasZ, + boolean expectedHasM) { + return readNestedGeometryInternal(defaultSrid, true, expectedHasZ, expectedHasM); + } + + /** + * Internal method to read a nested geometry with optional dimension validation. + */ + private GeometryModel readNestedGeometryInternal( + int defaultSrid, + boolean validateDimensions, + boolean expectedHasZ, + boolean expectedHasM) { + // Read endianness from the current buffer position + ByteOrder byteOrder = readEndianness(); + + // Save the current byte order and temporarily set to the nested geometry's byte order + ByteOrder savedByteOrder = buffer.order(); + buffer.order(byteOrder); + long typeStartPos = buffer.position(); + + // Read type and dimension + int typeAndDim = readInt(); + + // Parse WKB format (geotype and dimension) + if (!WkbConstants.isValidWkbType(typeAndDim)) { + throw new WkbParseException("Invalid or unsupported type " + typeAndDim, typeStartPos, + currentWkb); + } + + int baseType = WkbConstants.getBaseType(typeAndDim); + GeoTypeId geoType = GeoTypeId.fromValue(baseType); + int dimensionCount = WkbConstants.getDimensionCount(typeAndDim); + boolean hasZ = WkbConstants.hasZ(typeAndDim); + boolean hasM = WkbConstants.hasM(typeAndDim); + + // Validate dimensions match parent if required + if (validateDimensions && (hasZ != expectedHasZ || hasM != expectedHasM)) { + throw new WkbParseException( + "Invalid or unsupported type " + typeAndDim, typeStartPos, currentWkb); + } + + // Dispatch to appropriate reader based on geometry type + GeometryModel result = readGeometryData(geoType, defaultSrid, dimensionCount, hasZ, hasM, + typeStartPos); + + // Restore the saved byte order + buffer.order(savedByteOrder); + + return result; + } + + /** + * Internal method to read geometry with optional endianness reading. + * + * @param defaultSrid srid to use if not specified in WKB + * @param isRootGeometry if true, assumes endianness has already been read and buffer is set up; + * if false, reads endianness from current buffer position + * @return Geometry object + */ + private GeometryModel readGeometryInternal(int defaultSrid, boolean isRootGeometry) { + ByteOrder savedByteOrder = null; + long typeStartPos; + + if (isRootGeometry) { Review Comment: Why don't we check the byte order (e.g. using `readEndianness`) in this conditional branch, for the root geom? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/Ring.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +import java.util.List; + +/** + * Represents a ring (closed linestring) in a polygon. + */ +class Ring { + private final List<Point> points; + private final boolean hasZ; + private final boolean hasM; + + Ring(List<Point> points, boolean hasZ, boolean hasM) { + this.points = points; + this.hasZ = hasZ; + this.hasM = hasM; + } + + List<Point> getPoints() { + return points; + } + + int getNumPoints() { + return points.size(); + } + + boolean isEmpty() { + return points.isEmpty(); + } + + int getDimensionCount() { + return 2 + (hasZ ? 1 : 0) + (hasM ? 1 : 0); + } + + boolean hasZ() { + return hasZ; + } + + boolean hasM() { + return hasM; + } + + boolean isClosed() { Review Comment: I think it's worth noting in a comment that this is in line with OGC WKB semantics, which require rings to be explicitly closed. ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/STUtils.java: ########## @@ -58,6 +63,17 @@ public static GeometryVal geographyToGeometry(GeographyVal geographyVal) { return toPhysVal(Geometry.fromBytes(geographyVal.getBytes())); } + public static void parseWKB(byte[] wkb, int srid) { + new WkbReader().read(wkb, srid); + } + + public static GeometryVal physicalValFromWKB(byte[] wkb, int srid) { + byte[] bytes = new byte[HEADER_SIZE + wkb.length]; + ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid); + System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length); + return GeometryVal.fromBytes(bytes); + } Review Comment: Why do we need this? I think essentially duplicates the header+copy logic from `Geometry.fromWkb`. Also, `STUtils` shouldn't know about what's inside a `GeometryVal` byte buffer. We should interface through `Geometry` methods (fromWkb, toWkb). ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geometry.java: ########## @@ -120,17 +126,20 @@ public static Geometry fromEwkt(byte[] ewkt) { public byte[] toWkb() { // This method returns only the WKB portion of the in-memory Geometry representation. // Note that the header is skipped, and that the WKB is returned as-is (little-endian). Review Comment: Let's remove this comment, since it's no longer relevant. ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.spark.sql.catalyst.util.geo.WkbConstants.DEFAULT_SRID; + +/** + * Reader for parsing Well-Known Binary (WKB) format geometries. + * This class implements the OGC Simple Features specification for WKB parsing. + */ +public class WkbReader { + private ByteBuffer buffer; + private final int validationLevel; + private byte[] currentWkb; + + public WkbReader() { + this(1); + } + + public WkbReader(int validationLevel) { + this.validationLevel = validationLevel; + } + + // ========== Coordinate Validation Helpers ========== + + /** + * Returns true if the coordinate value is valid for a non-empty point. + * A valid coordinate is finite (not NaN and not Infinity). + */ + private static boolean isValidCoordinate(double value) { + return Double.isFinite(value); + } + + /** + * Returns true if the coordinate value is valid for a point that may be empty. + * A valid coordinate is either finite or NaN (for empty points). + * Infinity values are not allowed. + */ + private static boolean isValidCoordinateAllowEmpty(double value) { + return Double.isFinite(value) || Double.isNaN(value); + } + + /** + * Reads a geometry from WKB bytes. + */ + public GeometryModel read(byte[] wkb) { + try { + currentWkb = wkb; + return readGeometry(DEFAULT_SRID); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + */ + public GeometryModel read(byte[] wkb, int srid) { + try { + currentWkb = wkb; + return readGeometry(srid); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + private void checkNotAtEnd(long pos) { + if (buffer.position() >= buffer.limit()) { + throw new WkbParseException("Unexpected end of WKB buffer", pos, currentWkb); + } + } + + private ByteOrder readEndianness() { + checkNotAtEnd(buffer.position()); + byte endianValue = buffer.get(); + if (endianValue != WkbConstants.BIG_ENDIAN && endianValue != WkbConstants.LITTLE_ENDIAN) { + throw new WkbParseException("Invalid byte order " + endianValue, buffer.position() - 1, + currentWkb); + } + return endianValue == WkbConstants.LITTLE_ENDIAN ? + ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + } + + private int readInt() { + if (buffer.remaining() < WkbConstants.INT_SIZE) { + checkNotAtEnd(buffer.limit()); + } + return buffer.getInt(); + } + + /** + * Reads a double coordinate value, allowing NaN for empty points. + */ + private double readDoubleAllowEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinateAllowEmpty(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a double coordinate value, not allowing NaN (for non-point coordinates like rings). + */ + private double readDoubleNoEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinate(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + * + * @param defaultSrid srid to use if not specified in WKB + * @return Geometry object + */ + private GeometryModel readGeometry(int defaultSrid) { + // Check that we have at least one byte for endianness + if (currentWkb == null || currentWkb.length < 1) { + throw new WkbParseException("WKB data is empty or null", 0, currentWkb); + } + + // Read endianness directly from the first byte + byte endianValue = currentWkb[0]; + if (endianValue > 1) { + throw new WkbParseException("Invalid byte order " + endianValue, 0, currentWkb); + } + ByteOrder byteOrder = endianValue == 1 ? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + + // Check that we have enough bytes for the rest of the data + if (currentWkb.length < 5) { + throw new WkbParseException("WKB data too short", 0, currentWkb); + } + + // Create a new buffer wrapping the rest of the byte array (after the endianness byte) + buffer = ByteBuffer.wrap(currentWkb, 1, currentWkb.length - 1); + buffer.order(byteOrder); + + return readGeometryInternal(defaultSrid, true); + } + + /** + * Reads a nested geometry and validates that its dimensions match the parent's dimensions. + * The check happens immediately after reading the type, before reading coordinate data. + */ + private GeometryModel readNestedGeometryWithDimensionCheck( + int defaultSrid, + boolean expectedHasZ, + boolean expectedHasM) { + return readNestedGeometryInternal(defaultSrid, true, expectedHasZ, expectedHasM); + } + + /** + * Internal method to read a nested geometry with optional dimension validation. + */ + private GeometryModel readNestedGeometryInternal( + int defaultSrid, + boolean validateDimensions, + boolean expectedHasZ, + boolean expectedHasM) { + // Read endianness from the current buffer position + ByteOrder byteOrder = readEndianness(); + + // Save the current byte order and temporarily set to the nested geometry's byte order + ByteOrder savedByteOrder = buffer.order(); + buffer.order(byteOrder); + long typeStartPos = buffer.position(); + + // Read type and dimension + int typeAndDim = readInt(); + + // Parse WKB format (geotype and dimension) + if (!WkbConstants.isValidWkbType(typeAndDim)) { + throw new WkbParseException("Invalid or unsupported type " + typeAndDim, typeStartPos, + currentWkb); + } + + int baseType = WkbConstants.getBaseType(typeAndDim); + GeoTypeId geoType = GeoTypeId.fromValue(baseType); + int dimensionCount = WkbConstants.getDimensionCount(typeAndDim); + boolean hasZ = WkbConstants.hasZ(typeAndDim); + boolean hasM = WkbConstants.hasM(typeAndDim); + + // Validate dimensions match parent if required + if (validateDimensions && (hasZ != expectedHasZ || hasM != expectedHasM)) { + throw new WkbParseException( + "Invalid or unsupported type " + typeAndDim, typeStartPos, currentWkb); + } + + // Dispatch to appropriate reader based on geometry type + GeometryModel result = readGeometryData(geoType, defaultSrid, dimensionCount, hasZ, hasM, + typeStartPos); + + // Restore the saved byte order + buffer.order(savedByteOrder); + + return result; + } + + /** + * Internal method to read geometry with optional endianness reading. + * + * @param defaultSrid srid to use if not specified in WKB + * @param isRootGeometry if true, assumes endianness has already been read and buffer is set up; + * if false, reads endianness from current buffer position + * @return Geometry object + */ + private GeometryModel readGeometryInternal(int defaultSrid, boolean isRootGeometry) { + ByteOrder savedByteOrder = null; + long typeStartPos; + + if (isRootGeometry) { + // For root geometry, endianness has already been read and buffer is set up + typeStartPos = 1; + } else { + // For nested geometry, read endianness from the current buffer position + ByteOrder byteOrder = readEndianness(); + + // Save the current byte order and temporarily set to the nested geometry's byte order + savedByteOrder = buffer.order(); + buffer.order(byteOrder); + typeStartPos = buffer.position(); + } + + // Read type and dimension + int typeAndDim = readInt(); + + GeoTypeId geoType; + int dimensionCount; + + // Parse WKB format (geotype and dimension) + if (!WkbConstants.isValidWkbType(typeAndDim)) { + throw new WkbParseException("Invalid or unsupported type " + typeAndDim, typeStartPos, + currentWkb); + } + + int baseType = WkbConstants.getBaseType(typeAndDim); + geoType = GeoTypeId.fromValue(baseType); + dimensionCount = WkbConstants.getDimensionCount(typeAndDim); + boolean hasZ = WkbConstants.hasZ(typeAndDim); + boolean hasM = WkbConstants.hasM(typeAndDim); + + // Dispatch to appropriate reader based on geometry type + GeometryModel result = readGeometryData(geoType, defaultSrid, dimensionCount, hasZ, hasM, + typeStartPos); + + // Restore the saved byte order if this was a nested geometry + if (!isRootGeometry && savedByteOrder != null) { + buffer.order(savedByteOrder); + } + + return result; + } + + /** + * Reads geometry data based on the geometry type. + */ + private GeometryModel readGeometryData( + GeoTypeId geoType, + int defaultSrid, + int dimensionCount, + boolean hasZ, + boolean hasM, + long typeStartPos) { + switch (geoType) { + case POINT: + return readPoint(defaultSrid, dimensionCount, hasZ, hasM); + case LINESTRING: + return readLineString(defaultSrid, dimensionCount, hasZ, hasM); + case POLYGON: + return readPolygon(defaultSrid, dimensionCount, hasZ, hasM); + case MULTI_POINT: + return readMultiPoint(defaultSrid, dimensionCount, hasZ, hasM); + case MULTI_LINESTRING: + return readMultiLineString(defaultSrid, dimensionCount, hasZ, hasM); + case MULTI_POLYGON: + return readMultiPolygon(defaultSrid, dimensionCount, hasZ, hasM); + case GEOMETRY_COLLECTION: + return readGeometryCollection(defaultSrid, dimensionCount, hasZ, hasM); + default: + throw new WkbParseException("Unsupported geometry type: " + geoType, typeStartPos, + currentWkb); + } + } + + /** + * Reads a top-level point geometry (allows empty points with NaN coordinates). + */ + private Point readPoint(int srid, int dimensionCount, boolean hasZ, boolean hasM) { + double[] coords = new double[dimensionCount]; + for (int i = 0; i < dimensionCount; i++) { + coords[i] = readDoubleAllowEmpty(); + } + return new Point(coords, srid, hasZ, hasM); + } + + /** + * Reads point coordinates for non-point geometries (does not allow empty/NaN). + */ + private Point readPointCoordinatesNoEmpty(int dimensionCount, boolean hasZ, boolean hasM) { + double[] coords = new double[dimensionCount]; + for (int i = 0; i < dimensionCount; i++) { + coords[i] = readDoubleNoEmpty(); + } + return new Point(coords, 0, hasZ, hasM); // SRID will be set by parent Review Comment: Is this true - will the SRID always be set by parent? Could you please point me to the code that ensures this, as well as the corresponding test cases? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbConstants.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +/** + * Utility class for WKB-related constants and helper methods. + */ +public class WkbConstants { + // WKB byte order values + public static final byte BIG_ENDIAN = 0; + public static final byte LITTLE_ENDIAN = 1; + + // WKB base type codes (2D geometries) - matches GeoTypeId values + public static final int WKB_POINT = 1; + public static final int WKB_LINESTRING = 2; + public static final int WKB_POLYGON = 3; + public static final int WKB_MULTIPOINT = 4; + public static final int WKB_MULTILINESTRING = 5; + public static final int WKB_MULTIPOLYGON = 6; + public static final int WKB_GEOMETRYCOLLECTION = 7; + + // Min and max valid base type codes + public static final int WKB_MIN_TYPE = WKB_POINT; + public static final int WKB_MAX_TYPE = WKB_GEOMETRYCOLLECTION; + + // Dimensional offset multipliers (following OGC WKB spec) + // Type = BaseType + DIM_OFFSET * DimFactor + public static final int DIM_OFFSET_2D = 0; + public static final int DIM_OFFSET_Z = 1000; + public static final int DIM_OFFSET_M = 2000; + public static final int DIM_OFFSET_ZM = 3000; + + // WKB type codes for 3DZ geometries (Z coordinate) + public static final int WKB_POINT_Z = WKB_POINT + DIM_OFFSET_Z; + public static final int WKB_LINESTRING_Z = WKB_LINESTRING + DIM_OFFSET_Z; + public static final int WKB_POLYGON_Z = WKB_POLYGON + DIM_OFFSET_Z; + public static final int WKB_MULTIPOINT_Z = WKB_MULTIPOINT + DIM_OFFSET_Z; + public static final int WKB_MULTILINESTRING_Z = WKB_MULTILINESTRING + DIM_OFFSET_Z; + public static final int WKB_MULTIPOLYGON_Z = WKB_MULTIPOLYGON + DIM_OFFSET_Z; + public static final int WKB_GEOMETRYCOLLECTION_Z = WKB_GEOMETRYCOLLECTION + DIM_OFFSET_Z; + + // WKB type codes for 3DM geometries (M coordinate) + public static final int WKB_POINT_M = WKB_POINT + DIM_OFFSET_M; + public static final int WKB_LINESTRING_M = WKB_LINESTRING + DIM_OFFSET_M; + public static final int WKB_POLYGON_M = WKB_POLYGON + DIM_OFFSET_M; + public static final int WKB_MULTIPOINT_M = WKB_MULTIPOINT + DIM_OFFSET_M; + public static final int WKB_MULTILINESTRING_M = WKB_MULTILINESTRING + DIM_OFFSET_M; + public static final int WKB_MULTIPOLYGON_M = WKB_MULTIPOLYGON + DIM_OFFSET_M; + public static final int WKB_GEOMETRYCOLLECTION_M = WKB_GEOMETRYCOLLECTION + DIM_OFFSET_M; + + // WKB type codes for 4D geometries (Z and M coordinates) + public static final int WKB_POINT_ZM = WKB_POINT + DIM_OFFSET_ZM; + public static final int WKB_LINESTRING_ZM = WKB_LINESTRING + DIM_OFFSET_ZM; + public static final int WKB_POLYGON_ZM = WKB_POLYGON + DIM_OFFSET_ZM; + public static final int WKB_MULTIPOINT_ZM = WKB_MULTIPOINT + DIM_OFFSET_ZM; + public static final int WKB_MULTILINESTRING_ZM = WKB_MULTILINESTRING + DIM_OFFSET_ZM; + public static final int WKB_MULTIPOLYGON_ZM = WKB_MULTIPOLYGON + DIM_OFFSET_ZM; + public static final int WKB_GEOMETRYCOLLECTION_ZM = WKB_GEOMETRYCOLLECTION + DIM_OFFSET_ZM; + + // Size constants + public static final int BYTE_SIZE = 1; + public static final int INT_SIZE = 4; + public static final int DOUBLE_SIZE = 8; + + // Default SRID + public static final int DEFAULT_SRID = 0; Review Comment: Is this constant used anywhere? I think the SRID should be pushed in from callers instead? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.spark.sql.catalyst.util.geo.WkbConstants.DEFAULT_SRID; + +/** + * Reader for parsing Well-Known Binary (WKB) format geometries. + * This class implements the OGC Simple Features specification for WKB parsing. + */ +public class WkbReader { + private ByteBuffer buffer; + private final int validationLevel; + private byte[] currentWkb; + + public WkbReader() { + this(1); + } + + public WkbReader(int validationLevel) { + this.validationLevel = validationLevel; + } + + // ========== Coordinate Validation Helpers ========== + + /** + * Returns true if the coordinate value is valid for a non-empty point. + * A valid coordinate is finite (not NaN and not Infinity). + */ + private static boolean isValidCoordinate(double value) { + return Double.isFinite(value); + } + + /** + * Returns true if the coordinate value is valid for a point that may be empty. + * A valid coordinate is either finite or NaN (for empty points). + * Infinity values are not allowed. + */ + private static boolean isValidCoordinateAllowEmpty(double value) { + return Double.isFinite(value) || Double.isNaN(value); + } + + /** + * Reads a geometry from WKB bytes. + */ + public GeometryModel read(byte[] wkb) { + try { + currentWkb = wkb; + return readGeometry(DEFAULT_SRID); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + */ + public GeometryModel read(byte[] wkb, int srid) { + try { + currentWkb = wkb; + return readGeometry(srid); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + private void checkNotAtEnd(long pos) { + if (buffer.position() >= buffer.limit()) { + throw new WkbParseException("Unexpected end of WKB buffer", pos, currentWkb); + } + } + + private ByteOrder readEndianness() { + checkNotAtEnd(buffer.position()); + byte endianValue = buffer.get(); + if (endianValue != WkbConstants.BIG_ENDIAN && endianValue != WkbConstants.LITTLE_ENDIAN) { + throw new WkbParseException("Invalid byte order " + endianValue, buffer.position() - 1, + currentWkb); + } + return endianValue == WkbConstants.LITTLE_ENDIAN ? + ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + } + + private int readInt() { + if (buffer.remaining() < WkbConstants.INT_SIZE) { + checkNotAtEnd(buffer.limit()); + } + return buffer.getInt(); + } + + /** + * Reads a double coordinate value, allowing NaN for empty points. + */ + private double readDoubleAllowEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinateAllowEmpty(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a double coordinate value, not allowing NaN (for non-point coordinates like rings). + */ + private double readDoubleNoEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinate(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + * + * @param defaultSrid srid to use if not specified in WKB + * @return Geometry object + */ + private GeometryModel readGeometry(int defaultSrid) { + // Check that we have at least one byte for endianness + if (currentWkb == null || currentWkb.length < 1) { + throw new WkbParseException("WKB data is empty or null", 0, currentWkb); + } + + // Read endianness directly from the first byte + byte endianValue = currentWkb[0]; + if (endianValue > 1) { + throw new WkbParseException("Invalid byte order " + endianValue, 0, currentWkb); + } + ByteOrder byteOrder = endianValue == 1 ? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + + // Check that we have enough bytes for the rest of the data + if (currentWkb.length < 5) { + throw new WkbParseException("WKB data too short", 0, currentWkb); + } + + // Create a new buffer wrapping the rest of the byte array (after the endianness byte) + buffer = ByteBuffer.wrap(currentWkb, 1, currentWkb.length - 1); + buffer.order(byteOrder); + + return readGeometryInternal(defaultSrid, true); + } + + /** + * Reads a nested geometry and validates that its dimensions match the parent's dimensions. + * The check happens immediately after reading the type, before reading coordinate data. + */ + private GeometryModel readNestedGeometryWithDimensionCheck( + int defaultSrid, + boolean expectedHasZ, + boolean expectedHasM) { + return readNestedGeometryInternal(defaultSrid, true, expectedHasZ, expectedHasM); + } + + /** + * Internal method to read a nested geometry with optional dimension validation. + */ + private GeometryModel readNestedGeometryInternal( + int defaultSrid, + boolean validateDimensions, + boolean expectedHasZ, + boolean expectedHasM) { + // Read endianness from the current buffer position + ByteOrder byteOrder = readEndianness(); + + // Save the current byte order and temporarily set to the nested geometry's byte order + ByteOrder savedByteOrder = buffer.order(); + buffer.order(byteOrder); + long typeStartPos = buffer.position(); + + // Read type and dimension + int typeAndDim = readInt(); + + // Parse WKB format (geotype and dimension) + if (!WkbConstants.isValidWkbType(typeAndDim)) { + throw new WkbParseException("Invalid or unsupported type " + typeAndDim, typeStartPos, + currentWkb); + } + + int baseType = WkbConstants.getBaseType(typeAndDim); + GeoTypeId geoType = GeoTypeId.fromValue(baseType); + int dimensionCount = WkbConstants.getDimensionCount(typeAndDim); + boolean hasZ = WkbConstants.hasZ(typeAndDim); + boolean hasM = WkbConstants.hasM(typeAndDim); + + // Validate dimensions match parent if required + if (validateDimensions && (hasZ != expectedHasZ || hasM != expectedHasM)) { + throw new WkbParseException( + "Invalid or unsupported type " + typeAndDim, typeStartPos, currentWkb); + } + + // Dispatch to appropriate reader based on geometry type + GeometryModel result = readGeometryData(geoType, defaultSrid, dimensionCount, hasZ, hasM, + typeStartPos); + + // Restore the saved byte order + buffer.order(savedByteOrder); + + return result; + } + + /** + * Internal method to read geometry with optional endianness reading. + * + * @param defaultSrid srid to use if not specified in WKB + * @param isRootGeometry if true, assumes endianness has already been read and buffer is set up; + * if false, reads endianness from current buffer position + * @return Geometry object + */ + private GeometryModel readGeometryInternal(int defaultSrid, boolean isRootGeometry) { + ByteOrder savedByteOrder = null; + long typeStartPos; + + if (isRootGeometry) { + // For root geometry, endianness has already been read and buffer is set up + typeStartPos = 1; + } else { + // For nested geometry, read endianness from the current buffer position + ByteOrder byteOrder = readEndianness(); + + // Save the current byte order and temporarily set to the nested geometry's byte order + savedByteOrder = buffer.order(); + buffer.order(byteOrder); + typeStartPos = buffer.position(); Review Comment: Do we ever enter this branch? Do we have tests for this? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/GeometryCollection.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +import java.util.List; + +/** + * Represents a GeometryCollection geometry. + */ +class GeometryCollection extends GeometryModel { + private final List<GeometryModel> geometries; + private final boolean hasZ; + private final boolean hasM; + + GeometryCollection(List<GeometryModel> geometries, int srid, boolean hasZ, boolean hasM) { + super(GeoTypeId.GEOMETRY_COLLECTION, srid); + this.geometries = geometries; + this.hasZ = hasZ; + this.hasM = hasM; + } + + List<GeometryModel> getGeometries() { + return geometries; + } + + int getNumGeometries() { + return geometries.size(); + } + + @Override + boolean isEmpty() { + return geometries.isEmpty() || geometries.stream().allMatch(GeometryModel::isEmpty); + } + + @Override + int getDimensionCount() { + return 2 + (hasZ ? 1 : 0) + (hasM ? 1 : 0); + } + + @Override + boolean hasZ() { + return hasZ; + } + + @Override + boolean hasM() { + return hasM; + } + + @Override + public String toString() { + if (isEmpty()) { + return "GEOMETRYCOLLECTION EMPTY"; + } + return "GEOMETRYCOLLECTION (" + geometries.size() + " geometries)"; + } Review Comment: I see that all of the classes implement `toString`. Do we need this? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/Point.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +import java.util.Arrays; + +/** + * Represents a point geometry with coordinates. + */ +class Point extends GeometryModel { + private final double[] coordinates; + private final boolean is_Empty; Review Comment: ```suggestion private final boolean isEmpty; ``` ########## sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbReaderWriterAdvancedTest.java: ########## @@ -0,0 +1,1160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util.geo; + +import org.apache.spark.sql.catalyst.util.Geometry; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.nio.ByteOrder; +import java.util.List; + +/** + * Test suite for WKB (Well-Known Binary) reader and writer functionality. + * + * These tests verify WKB round-trip (read/write) for various geometry types + * in different dimensions (2D, 3DZ, 3DM, 4D). + */ +public class WkbReaderWriterAdvancedTest { + + /** + * Helper method to convert hex string to byte array + */ + private byte[] hexToBytes(String hex) { + int len = hex.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4) + + Character.digit(hex.charAt(i + 1), 16)); + } + return data; + } + + /** + * Helper method to convert byte array to hex string + */ + private String bytesToHex(byte[] bytes) { + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } Review Comment: I think we already have this as part of `WkbReaderWriterGeometryModelTest`. Can we avoid duplication? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.spark.sql.catalyst.util.geo.WkbConstants.DEFAULT_SRID; + +/** + * Reader for parsing Well-Known Binary (WKB) format geometries. + * This class implements the OGC Simple Features specification for WKB parsing. + */ +public class WkbReader { + private ByteBuffer buffer; + private final int validationLevel; + private byte[] currentWkb; + + public WkbReader() { + this(1); + } + + public WkbReader(int validationLevel) { + this.validationLevel = validationLevel; + } + + // ========== Coordinate Validation Helpers ========== + + /** + * Returns true if the coordinate value is valid for a non-empty point. + * A valid coordinate is finite (not NaN and not Infinity). + */ + private static boolean isValidCoordinate(double value) { + return Double.isFinite(value); + } + + /** + * Returns true if the coordinate value is valid for a point that may be empty. + * A valid coordinate is either finite or NaN (for empty points). + * Infinity values are not allowed. + */ + private static boolean isValidCoordinateAllowEmpty(double value) { + return Double.isFinite(value) || Double.isNaN(value); + } + + /** + * Reads a geometry from WKB bytes. + */ + public GeometryModel read(byte[] wkb) { + try { + currentWkb = wkb; + return readGeometry(DEFAULT_SRID); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + */ + public GeometryModel read(byte[] wkb, int srid) { + try { + currentWkb = wkb; + return readGeometry(srid); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + private void checkNotAtEnd(long pos) { + if (buffer.position() >= buffer.limit()) { + throw new WkbParseException("Unexpected end of WKB buffer", pos, currentWkb); + } + } + + private ByteOrder readEndianness() { + checkNotAtEnd(buffer.position()); + byte endianValue = buffer.get(); + if (endianValue != WkbConstants.BIG_ENDIAN && endianValue != WkbConstants.LITTLE_ENDIAN) { + throw new WkbParseException("Invalid byte order " + endianValue, buffer.position() - 1, + currentWkb); + } + return endianValue == WkbConstants.LITTLE_ENDIAN ? + ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + } + + private int readInt() { + if (buffer.remaining() < WkbConstants.INT_SIZE) { Review Comment: Is this enough? Can we get a `BufferUnderflowException` here, in the case when there are some remaining bytes (but fewer than INT_SIZE)? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.spark.sql.catalyst.util.geo.WkbConstants.DEFAULT_SRID; + +/** + * Reader for parsing Well-Known Binary (WKB) format geometries. + * This class implements the OGC Simple Features specification for WKB parsing. Review Comment: I think it's worth noting that this class is not intended to be thread safe, same as WkbWriter. ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/STUtils.java: ########## @@ -58,6 +63,17 @@ public static GeometryVal geographyToGeometry(GeographyVal geographyVal) { return toPhysVal(Geometry.fromBytes(geographyVal.getBytes())); } + public static void parseWKB(byte[] wkb, int srid) { + new WkbReader().read(wkb, srid); + } + + public static GeometryVal physicalValFromWKB(byte[] wkb, int srid) { + byte[] bytes = new byte[HEADER_SIZE + wkb.length]; + ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid); + System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length); + return GeometryVal.fromBytes(bytes); + } Review Comment: > Also, `STUtils` shouldn't know about what's inside a `GeometryVal` byte buffer. This is especially important if the header layout ever changes (e.g., we add more metadata) – we don’t want subtle mismatches across the codebase. ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.spark.sql.catalyst.util.geo.WkbConstants.DEFAULT_SRID; + +/** + * Reader for parsing Well-Known Binary (WKB) format geometries. + * This class implements the OGC Simple Features specification for WKB parsing. + */ +public class WkbReader { + private ByteBuffer buffer; + private final int validationLevel; + private byte[] currentWkb; + + public WkbReader() { + this(1); + } + + public WkbReader(int validationLevel) { + this.validationLevel = validationLevel; + } + + // ========== Coordinate Validation Helpers ========== + + /** + * Returns true if the coordinate value is valid for a non-empty point. + * A valid coordinate is finite (not NaN and not Infinity). + */ + private static boolean isValidCoordinate(double value) { + return Double.isFinite(value); + } + + /** + * Returns true if the coordinate value is valid for a point that may be empty. + * A valid coordinate is either finite or NaN (for empty points). + * Infinity values are not allowed. + */ + private static boolean isValidCoordinateAllowEmpty(double value) { + return Double.isFinite(value) || Double.isNaN(value); + } + + /** + * Reads a geometry from WKB bytes. + */ + public GeometryModel read(byte[] wkb) { + try { + currentWkb = wkb; + return readGeometry(DEFAULT_SRID); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + */ + public GeometryModel read(byte[] wkb, int srid) { + try { + currentWkb = wkb; + return readGeometry(srid); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + private void checkNotAtEnd(long pos) { + if (buffer.position() >= buffer.limit()) { + throw new WkbParseException("Unexpected end of WKB buffer", pos, currentWkb); + } + } + + private ByteOrder readEndianness() { + checkNotAtEnd(buffer.position()); + byte endianValue = buffer.get(); + if (endianValue != WkbConstants.BIG_ENDIAN && endianValue != WkbConstants.LITTLE_ENDIAN) { + throw new WkbParseException("Invalid byte order " + endianValue, buffer.position() - 1, + currentWkb); + } + return endianValue == WkbConstants.LITTLE_ENDIAN ? + ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + } + + private int readInt() { + if (buffer.remaining() < WkbConstants.INT_SIZE) { + checkNotAtEnd(buffer.limit()); + } + return buffer.getInt(); + } + + /** + * Reads a double coordinate value, allowing NaN for empty points. + */ + private double readDoubleAllowEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinateAllowEmpty(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a double coordinate value, not allowing NaN (for non-point coordinates like rings). + */ + private double readDoubleNoEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinate(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + * + * @param defaultSrid srid to use if not specified in WKB + * @return Geometry object + */ + private GeometryModel readGeometry(int defaultSrid) { + // Check that we have at least one byte for endianness + if (currentWkb == null || currentWkb.length < 1) { + throw new WkbParseException("WKB data is empty or null", 0, currentWkb); + } + + // Read endianness directly from the first byte + byte endianValue = currentWkb[0]; + if (endianValue > 1) { + throw new WkbParseException("Invalid byte order " + endianValue, 0, currentWkb); + } + ByteOrder byteOrder = endianValue == 1 ? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + + // Check that we have enough bytes for the rest of the data + if (currentWkb.length < 5) { + throw new WkbParseException("WKB data too short", 0, currentWkb); + } + + // Create a new buffer wrapping the rest of the byte array (after the endianness byte) + buffer = ByteBuffer.wrap(currentWkb, 1, currentWkb.length - 1); + buffer.order(byteOrder); + + return readGeometryInternal(defaultSrid, true); + } + + /** + * Reads a nested geometry and validates that its dimensions match the parent's dimensions. + * The check happens immediately after reading the type, before reading coordinate data. + */ + private GeometryModel readNestedGeometryWithDimensionCheck( + int defaultSrid, + boolean expectedHasZ, + boolean expectedHasM) { + return readNestedGeometryInternal(defaultSrid, true, expectedHasZ, expectedHasM); + } + + /** + * Internal method to read a nested geometry with optional dimension validation. + */ + private GeometryModel readNestedGeometryInternal( + int defaultSrid, + boolean validateDimensions, + boolean expectedHasZ, + boolean expectedHasM) { + // Read endianness from the current buffer position + ByteOrder byteOrder = readEndianness(); + + // Save the current byte order and temporarily set to the nested geometry's byte order + ByteOrder savedByteOrder = buffer.order(); + buffer.order(byteOrder); + long typeStartPos = buffer.position(); + + // Read type and dimension + int typeAndDim = readInt(); + + // Parse WKB format (geotype and dimension) + if (!WkbConstants.isValidWkbType(typeAndDim)) { + throw new WkbParseException("Invalid or unsupported type " + typeAndDim, typeStartPos, + currentWkb); + } + + int baseType = WkbConstants.getBaseType(typeAndDim); + GeoTypeId geoType = GeoTypeId.fromValue(baseType); + int dimensionCount = WkbConstants.getDimensionCount(typeAndDim); + boolean hasZ = WkbConstants.hasZ(typeAndDim); + boolean hasM = WkbConstants.hasM(typeAndDim); + + // Validate dimensions match parent if required + if (validateDimensions && (hasZ != expectedHasZ || hasM != expectedHasM)) { + throw new WkbParseException( + "Invalid or unsupported type " + typeAndDim, typeStartPos, currentWkb); + } + + // Dispatch to appropriate reader based on geometry type + GeometryModel result = readGeometryData(geoType, defaultSrid, dimensionCount, hasZ, hasM, + typeStartPos); + + // Restore the saved byte order + buffer.order(savedByteOrder); + + return result; + } + + /** + * Internal method to read geometry with optional endianness reading. + * + * @param defaultSrid srid to use if not specified in WKB + * @param isRootGeometry if true, assumes endianness has already been read and buffer is set up; + * if false, reads endianness from current buffer position + * @return Geometry object + */ + private GeometryModel readGeometryInternal(int defaultSrid, boolean isRootGeometry) { + ByteOrder savedByteOrder = null; + long typeStartPos; + + if (isRootGeometry) { + // For root geometry, endianness has already been read and buffer is set up + typeStartPos = 1; + } else { + // For nested geometry, read endianness from the current buffer position + ByteOrder byteOrder = readEndianness(); + + // Save the current byte order and temporarily set to the nested geometry's byte order + savedByteOrder = buffer.order(); + buffer.order(byteOrder); + typeStartPos = buffer.position(); + } + + // Read type and dimension + int typeAndDim = readInt(); + + GeoTypeId geoType; + int dimensionCount; + + // Parse WKB format (geotype and dimension) + if (!WkbConstants.isValidWkbType(typeAndDim)) { + throw new WkbParseException("Invalid or unsupported type " + typeAndDim, typeStartPos, + currentWkb); + } + + int baseType = WkbConstants.getBaseType(typeAndDim); + geoType = GeoTypeId.fromValue(baseType); + dimensionCount = WkbConstants.getDimensionCount(typeAndDim); + boolean hasZ = WkbConstants.hasZ(typeAndDim); + boolean hasM = WkbConstants.hasM(typeAndDim); + + // Dispatch to appropriate reader based on geometry type + GeometryModel result = readGeometryData(geoType, defaultSrid, dimensionCount, hasZ, hasM, + typeStartPos); + + // Restore the saved byte order if this was a nested geometry + if (!isRootGeometry && savedByteOrder != null) { + buffer.order(savedByteOrder); + } + + return result; + } + + /** + * Reads geometry data based on the geometry type. + */ + private GeometryModel readGeometryData( + GeoTypeId geoType, + int defaultSrid, + int dimensionCount, + boolean hasZ, + boolean hasM, + long typeStartPos) { + switch (geoType) { + case POINT: + return readPoint(defaultSrid, dimensionCount, hasZ, hasM); + case LINESTRING: + return readLineString(defaultSrid, dimensionCount, hasZ, hasM); + case POLYGON: + return readPolygon(defaultSrid, dimensionCount, hasZ, hasM); + case MULTI_POINT: + return readMultiPoint(defaultSrid, dimensionCount, hasZ, hasM); + case MULTI_LINESTRING: + return readMultiLineString(defaultSrid, dimensionCount, hasZ, hasM); + case MULTI_POLYGON: + return readMultiPolygon(defaultSrid, dimensionCount, hasZ, hasM); + case GEOMETRY_COLLECTION: + return readGeometryCollection(defaultSrid, dimensionCount, hasZ, hasM); + default: + throw new WkbParseException("Unsupported geometry type: " + geoType, typeStartPos, + currentWkb); + } + } + + /** + * Reads a top-level point geometry (allows empty points with NaN coordinates). + */ + private Point readPoint(int srid, int dimensionCount, boolean hasZ, boolean hasM) { + double[] coords = new double[dimensionCount]; + for (int i = 0; i < dimensionCount; i++) { + coords[i] = readDoubleAllowEmpty(); + } + return new Point(coords, srid, hasZ, hasM); + } + + /** + * Reads point coordinates for non-point geometries (does not allow empty/NaN). + */ + private Point readPointCoordinatesNoEmpty(int dimensionCount, boolean hasZ, boolean hasM) { + double[] coords = new double[dimensionCount]; + for (int i = 0; i < dimensionCount; i++) { + coords[i] = readDoubleNoEmpty(); + } + return new Point(coords, 0, hasZ, hasM); // SRID will be set by parent Review Comment: On another note, I think we should avoid hard-coding arguments anywhere in the code... Let's use centralized (and unified) constants for this kind of scenario, whenever possible. ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.spark.sql.catalyst.util.geo.WkbConstants.DEFAULT_SRID; + +/** + * Reader for parsing Well-Known Binary (WKB) format geometries. + * This class implements the OGC Simple Features specification for WKB parsing. + */ +public class WkbReader { + private ByteBuffer buffer; + private final int validationLevel; Review Comment: Can we use a comment to explain the validation level semantics a bit more in detail? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Dimension.java: ########## Review Comment: Do we use this class anywhere? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/Point.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +import java.util.Arrays; + +/** + * Represents a point geometry with coordinates. + */ +class Point extends GeometryModel { + private final double[] coordinates; + private final boolean is_Empty; Review Comment: I think it's a good idea to cache emptiness, but the field name could just be a bit more in line with the overall code style. ########## sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/util/geo/WkbErrorHandlingTest.java: ########## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util.geo; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +/** + * Test suite for WKB error handling and edge cases. + */ +public class WkbErrorHandlingTest { + + private byte[] hexToBytes(String hex) { + int len = hex.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4) + + Character.digit(hex.charAt(i + 1), 16)); + } + return data; + } + + /** + * Helper method to assert that parsing a WKB hex string throws WkbParseException + * containing the expected error message and the original WKB hex. + */ + private void assertParseError(String hex, String expectedMessagePart) { + assertParseError(hex, expectedMessagePart, 1); + } + + /** + * Helper method to assert that parsing a WKB hex string throws WkbParseException + * containing the expected error message and the original WKB hex, with specified + * validation level. + */ + private void assertParseError(String hex, String expectedMessagePart, int validationLevel) { + byte[] wkb = hexToBytes(hex); + WkbReader reader = new WkbReader(validationLevel); + WkbParseException ex = Assertions.assertThrows( + WkbParseException.class, () -> reader.read(wkb), + "Should throw WkbParseException for WKB: " + hex); + Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()), + "Exception message should contain the WKB hex: " + hex + ", actual: " + ex.getMessage()); + if (expectedMessagePart != null && !expectedMessagePart.isEmpty()) { + Assertions.assertTrue( + ex.getMessage().toLowerCase().contains(expectedMessagePart.toLowerCase()), + "Exception message should contain '" + expectedMessagePart + "', actual: " + + ex.getMessage()); + } + } + + @Test + public void testEmptyWkb() { + byte[] emptyWkb = new byte[0]; + WkbReader reader = new WkbReader(); + WkbParseException ex = Assertions.assertThrows( + WkbParseException.class, () -> reader.read(emptyWkb)); + // Empty WKB produces empty hex string, so just verify exception was thrown + Assertions.assertNotNull(ex.getMessage()); + } + + @Test + public void testTooShortWkb() { + // Only endianness byte + String hex = "01"; + byte[] tooShort = hexToBytes(hex); + WkbReader reader = new WkbReader(); + WkbParseException ex = Assertions.assertThrows( + WkbParseException.class, () -> reader.read(tooShort)); + Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()), + "Exception message should contain the WKB hex: " + hex); + } + + @Test + public void testInvalidGeometryTypeZero() { + // Type = 0 (invalid, should be 1-7) + String hex = "0100000000000000000000F03F0000000000000040"; + byte[] invalidType = hexToBytes(hex); + WkbReader reader = new WkbReader(); + WkbParseException ex = Assertions.assertThrows( + WkbParseException.class, () -> reader.read(invalidType)); + Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()), + "Exception message should contain the WKB hex: " + hex); + } + + @Test + public void testTruncatedPointCoordinates() { + // Point WKB with truncated coordinates (missing Y coordinate) + String hex = "0101000000000000000000F03F"; + byte[] truncated = hexToBytes(hex); + WkbReader reader = new WkbReader(); + WkbParseException ex = Assertions.assertThrows( + WkbParseException.class, () -> reader.read(truncated)); + Assertions.assertTrue(ex.getMessage().toUpperCase().contains(hex.toUpperCase()), + "Exception message should contain the WKB hex: " + hex); + } + Review Comment: One additional test can be: ``` // Only one byte (FF) of the 4-byte INT field. String hex = "0102000000ff"; ``` ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbReader.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.spark.sql.catalyst.util.geo.WkbConstants.DEFAULT_SRID; + +/** + * Reader for parsing Well-Known Binary (WKB) format geometries. + * This class implements the OGC Simple Features specification for WKB parsing. + */ +public class WkbReader { + private ByteBuffer buffer; + private final int validationLevel; + private byte[] currentWkb; + + public WkbReader() { + this(1); + } + + public WkbReader(int validationLevel) { + this.validationLevel = validationLevel; + } + + // ========== Coordinate Validation Helpers ========== + + /** + * Returns true if the coordinate value is valid for a non-empty point. + * A valid coordinate is finite (not NaN and not Infinity). + */ + private static boolean isValidCoordinate(double value) { + return Double.isFinite(value); + } + + /** + * Returns true if the coordinate value is valid for a point that may be empty. + * A valid coordinate is either finite or NaN (for empty points). + * Infinity values are not allowed. + */ + private static boolean isValidCoordinateAllowEmpty(double value) { + return Double.isFinite(value) || Double.isNaN(value); + } + + /** + * Reads a geometry from WKB bytes. + */ + public GeometryModel read(byte[] wkb) { + try { + currentWkb = wkb; + return readGeometry(DEFAULT_SRID); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + */ + public GeometryModel read(byte[] wkb, int srid) { + try { + currentWkb = wkb; + return readGeometry(srid); + } finally { + // Clear references to allow garbage collection + buffer = null; + currentWkb = null; + } + } + + private void checkNotAtEnd(long pos) { + if (buffer.position() >= buffer.limit()) { + throw new WkbParseException("Unexpected end of WKB buffer", pos, currentWkb); + } + } + + private ByteOrder readEndianness() { + checkNotAtEnd(buffer.position()); + byte endianValue = buffer.get(); + if (endianValue != WkbConstants.BIG_ENDIAN && endianValue != WkbConstants.LITTLE_ENDIAN) { + throw new WkbParseException("Invalid byte order " + endianValue, buffer.position() - 1, + currentWkb); + } + return endianValue == WkbConstants.LITTLE_ENDIAN ? + ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + } + + private int readInt() { + if (buffer.remaining() < WkbConstants.INT_SIZE) { + checkNotAtEnd(buffer.limit()); + } + return buffer.getInt(); + } + + /** + * Reads a double coordinate value, allowing NaN for empty points. + */ + private double readDoubleAllowEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinateAllowEmpty(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a double coordinate value, not allowing NaN (for non-point coordinates like rings). + */ + private double readDoubleNoEmpty() { + if (buffer.remaining() < WkbConstants.DOUBLE_SIZE) { + checkNotAtEnd(buffer.limit()); + } + double value = buffer.getDouble(); + if (!isValidCoordinate(value)) { + throw new WkbParseException("Invalid coordinate value found", buffer.position() - 8, + currentWkb); + } + return value; + } + + /** + * Reads a geometry from WKB bytes with a specified SRID. + * + * @param defaultSrid srid to use if not specified in WKB + * @return Geometry object + */ + private GeometryModel readGeometry(int defaultSrid) { + // Check that we have at least one byte for endianness + if (currentWkb == null || currentWkb.length < 1) { + throw new WkbParseException("WKB data is empty or null", 0, currentWkb); + } + + // Read endianness directly from the first byte + byte endianValue = currentWkb[0]; + if (endianValue > 1) { + throw new WkbParseException("Invalid byte order " + endianValue, 0, currentWkb); + } + ByteOrder byteOrder = endianValue == 1 ? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; + + // Check that we have enough bytes for the rest of the data + if (currentWkb.length < 5) { + throw new WkbParseException("WKB data too short", 0, currentWkb); + } + + // Create a new buffer wrapping the rest of the byte array (after the endianness byte) + buffer = ByteBuffer.wrap(currentWkb, 1, currentWkb.length - 1); + buffer.order(byteOrder); + + return readGeometryInternal(defaultSrid, true); + } + + /** + * Reads a nested geometry and validates that its dimensions match the parent's dimensions. + * The check happens immediately after reading the type, before reading coordinate data. + */ + private GeometryModel readNestedGeometryWithDimensionCheck( + int defaultSrid, + boolean expectedHasZ, + boolean expectedHasM) { + return readNestedGeometryInternal(defaultSrid, true, expectedHasZ, expectedHasM); + } + + /** + * Internal method to read a nested geometry with optional dimension validation. + */ + private GeometryModel readNestedGeometryInternal( + int defaultSrid, + boolean validateDimensions, + boolean expectedHasZ, + boolean expectedHasM) { + // Read endianness from the current buffer position + ByteOrder byteOrder = readEndianness(); + + // Save the current byte order and temporarily set to the nested geometry's byte order + ByteOrder savedByteOrder = buffer.order(); + buffer.order(byteOrder); + long typeStartPos = buffer.position(); + + // Read type and dimension + int typeAndDim = readInt(); + + // Parse WKB format (geotype and dimension) + if (!WkbConstants.isValidWkbType(typeAndDim)) { + throw new WkbParseException("Invalid or unsupported type " + typeAndDim, typeStartPos, + currentWkb); + } + + int baseType = WkbConstants.getBaseType(typeAndDim); + GeoTypeId geoType = GeoTypeId.fromValue(baseType); + int dimensionCount = WkbConstants.getDimensionCount(typeAndDim); + boolean hasZ = WkbConstants.hasZ(typeAndDim); + boolean hasM = WkbConstants.hasM(typeAndDim); + + // Validate dimensions match parent if required + if (validateDimensions && (hasZ != expectedHasZ || hasM != expectedHasM)) { + throw new WkbParseException( + "Invalid or unsupported type " + typeAndDim, typeStartPos, currentWkb); + } + + // Dispatch to appropriate reader based on geometry type + GeometryModel result = readGeometryData(geoType, defaultSrid, dimensionCount, hasZ, hasM, + typeStartPos); + + // Restore the saved byte order + buffer.order(savedByteOrder); + + return result; + } + + /** + * Internal method to read geometry with optional endianness reading. + * + * @param defaultSrid srid to use if not specified in WKB + * @param isRootGeometry if true, assumes endianness has already been read and buffer is set up; + * if false, reads endianness from current buffer position + * @return Geometry object + */ + private GeometryModel readGeometryInternal(int defaultSrid, boolean isRootGeometry) { + ByteOrder savedByteOrder = null; + long typeStartPos; + + if (isRootGeometry) { + // For root geometry, endianness has already been read and buffer is set up + typeStartPos = 1; + } else { + // For nested geometry, read endianness from the current buffer position + ByteOrder byteOrder = readEndianness(); + + // Save the current byte order and temporarily set to the nested geometry's byte order + savedByteOrder = buffer.order(); + buffer.order(byteOrder); + typeStartPos = buffer.position(); + } + + // Read type and dimension + int typeAndDim = readInt(); + + GeoTypeId geoType; + int dimensionCount; + + // Parse WKB format (geotype and dimension) + if (!WkbConstants.isValidWkbType(typeAndDim)) { + throw new WkbParseException("Invalid or unsupported type " + typeAndDim, typeStartPos, + currentWkb); + } + + int baseType = WkbConstants.getBaseType(typeAndDim); + geoType = GeoTypeId.fromValue(baseType); + dimensionCount = WkbConstants.getDimensionCount(typeAndDim); + boolean hasZ = WkbConstants.hasZ(typeAndDim); + boolean hasM = WkbConstants.hasM(typeAndDim); + + // Dispatch to appropriate reader based on geometry type + GeometryModel result = readGeometryData(geoType, defaultSrid, dimensionCount, hasZ, hasM, + typeStartPos); + + // Restore the saved byte order if this was a nested geometry + if (!isRootGeometry && savedByteOrder != null) { + buffer.order(savedByteOrder); + } + + return result; + } + + /** + * Reads geometry data based on the geometry type. + */ + private GeometryModel readGeometryData( + GeoTypeId geoType, + int defaultSrid, + int dimensionCount, + boolean hasZ, + boolean hasM, + long typeStartPos) { + switch (geoType) { + case POINT: + return readPoint(defaultSrid, dimensionCount, hasZ, hasM); + case LINESTRING: + return readLineString(defaultSrid, dimensionCount, hasZ, hasM); + case POLYGON: + return readPolygon(defaultSrid, dimensionCount, hasZ, hasM); + case MULTI_POINT: + return readMultiPoint(defaultSrid, dimensionCount, hasZ, hasM); + case MULTI_LINESTRING: + return readMultiLineString(defaultSrid, dimensionCount, hasZ, hasM); + case MULTI_POLYGON: + return readMultiPolygon(defaultSrid, dimensionCount, hasZ, hasM); + case GEOMETRY_COLLECTION: + return readGeometryCollection(defaultSrid, dimensionCount, hasZ, hasM); + default: + throw new WkbParseException("Unsupported geometry type: " + geoType, typeStartPos, + currentWkb); + } + } + + /** + * Reads a top-level point geometry (allows empty points with NaN coordinates). + */ + private Point readPoint(int srid, int dimensionCount, boolean hasZ, boolean hasM) { + double[] coords = new double[dimensionCount]; + for (int i = 0; i < dimensionCount; i++) { + coords[i] = readDoubleAllowEmpty(); + } + return new Point(coords, srid, hasZ, hasM); + } + + /** + * Reads point coordinates for non-point geometries (does not allow empty/NaN). + */ + private Point readPointCoordinatesNoEmpty(int dimensionCount, boolean hasZ, boolean hasM) { + double[] coords = new double[dimensionCount]; + for (int i = 0; i < dimensionCount; i++) { + coords[i] = readDoubleNoEmpty(); + } + return new Point(coords, 0, hasZ, hasM); // SRID will be set by parent Review Comment: Put shortly, I want to check whether this situation is possible: a MultiPoint has some SRID≠0, and its children (Points) have SRID=0. ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/WkbWriter.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Utility class for converting geometries to Well-Known Binary (WKB) format. + * This class implements the OGC Simple Features specification for WKB writing. + * <p> + * This class is designed to support reuse of a single instance to write multiple + * geometries efficiently. This class is NOT thread-safe; each thread should create + * its own instance. + */ +public class WkbWriter { + + /** + * Gets the WKB type code for a geometry, including dimension offset. + * For example: Point 2D = 1, Point Z = 1001, Point M = 2001, Point ZM = 3001 + */ + private static int getWkbType(GeometryModel geometry) { + int baseType = (int) geometry.getTypeId().getValue(); + boolean hasZ = geometry.hasZ(); + boolean hasM = geometry.hasM(); + + // Determine dimension offset based on hasZ and hasM flags + if (hasZ && hasM) { + return baseType + WkbConstants.DIM_OFFSET_ZM; + } else if (hasZ) { + return baseType + WkbConstants.DIM_OFFSET_Z; + } else if (hasM) { + return baseType + WkbConstants.DIM_OFFSET_M; + } else { + return baseType + WkbConstants.DIM_OFFSET_2D; + } + } + + // ========== Public Write Methods ========== + + /** + * Writes a geometry to WKB format. + */ + public byte[] write(GeometryModel geometry) { + return write(geometry, ByteOrder.LITTLE_ENDIAN); + } + + /** + * Writes a geometry to WKB format with specified byte order. + * <p> + * This method reuses an internal buffer to reduce GC pressure when writing + * many geometries. The returned byte array is a copy of the internal buffer. + */ + public byte[] write(GeometryModel geometry, ByteOrder byteOrder) { + // Calculate size first + int size = calculateSize(geometry); + ByteBuffer buffer = ByteBuffer.allocate(size); + buffer.order(byteOrder); + writeGeometry(buffer, geometry, byteOrder); + // Return a copy of exactly the right size + return buffer.array(); + } + + // ========== Size Calculation ========== + + private int calculateSize(GeometryModel geometry) { + int dimCount = geometry.getDimensionCount(); + + // Header: endianness (1 byte) + type (4 bytes) + int size = WkbConstants.BYTE_SIZE + WkbConstants.INT_SIZE; + + if (geometry instanceof Point) { + Point point = (Point) geometry; + size += point.getCoordinates().length * WkbConstants.DOUBLE_SIZE; + } else if (geometry instanceof LineString) { + LineString lineString = (LineString) geometry; + // Number of points (4 bytes) + coordinates + size += WkbConstants.INT_SIZE; + size += lineString.getNumPoints() * dimCount * WkbConstants.DOUBLE_SIZE; + } else if (geometry instanceof Polygon) { + Polygon polygon = (Polygon) geometry; + // Number of rings (4 bytes) + size += WkbConstants.INT_SIZE; + for (Ring ring : polygon.getRings()) { + int numPoints = ring.getNumPoints(); + // Number of points in ring (4 bytes) + coordinates + size += WkbConstants.INT_SIZE; + size += numPoints * dimCount * WkbConstants.DOUBLE_SIZE; + } + } else if (geometry instanceof MultiPoint) { + MultiPoint mp = (MultiPoint) geometry; + // Number of geometries (4 bytes) + size += WkbConstants.INT_SIZE; + for (Point p : mp.getPoints()) { + size += calculateSize(p); + } + } else if (geometry instanceof MultiLineString) { + MultiLineString mls = (MultiLineString) geometry; + // Number of geometries (4 bytes) + size += WkbConstants.INT_SIZE; + for (LineString ls : mls.getLineStrings()) { + size += calculateSize(ls); + } + } else if (geometry instanceof MultiPolygon) { + MultiPolygon mpoly = (MultiPolygon) geometry; + // Number of geometries (4 bytes) + size += WkbConstants.INT_SIZE; + for (Polygon poly : mpoly.getPolygons()) { + size += calculateSize(poly); + } + } else if (geometry instanceof GeometryCollection) { + GeometryCollection gc = (GeometryCollection) geometry; + // Number of geometries (4 bytes) + size += WkbConstants.INT_SIZE; + for (GeometryModel geom : gc.getGeometries()) { + size += calculateSize(geom); + } + } + + return size; + } + + // ========== Write Methods ========== + + private void writeGeometry(ByteBuffer buffer, GeometryModel geometry, ByteOrder byteOrder) { + // Write endianness + buffer.put(byteOrder == ByteOrder.LITTLE_ENDIAN + ? WkbConstants.LITTLE_ENDIAN : WkbConstants.BIG_ENDIAN); + + // Write type + int type = getWkbType(geometry); + buffer.putInt(type); + + // Write geometry-specific data + if (geometry instanceof Point) { + writePoint(buffer, (Point) geometry); + } else if (geometry instanceof LineString) { + writeLineString(buffer, (LineString) geometry); + } else if (geometry instanceof Polygon) { + writePolygon(buffer, (Polygon) geometry); + } else if (geometry instanceof MultiPoint) { + writeMultiPoint(buffer, (MultiPoint) geometry, byteOrder); + } else if (geometry instanceof MultiLineString) { + writeMultiLineString(buffer, (MultiLineString) geometry, byteOrder); + } else if (geometry instanceof MultiPolygon) { + writeMultiPolygon(buffer, (MultiPolygon) geometry, byteOrder); + } else if (geometry instanceof GeometryCollection) { + writeGeometryCollection(buffer, (GeometryCollection) geometry, byteOrder); + } + } + + private void writePoint(ByteBuffer buffer, Point point) { + for (double coord : point.getCoordinates()) { + buffer.putDouble(coord); + } + } + + private void writeCoordinates(ByteBuffer buffer, Point point) { + for (double coord : point.getCoordinates()) { + buffer.putDouble(coord); + } + } Review Comment: Are these two identical? Can we keep just one of them? ########## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/geo/GeometryCollection.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util.geo; + +import java.util.List; + +/** + * Represents a GeometryCollection geometry. + */ +class GeometryCollection extends GeometryModel { + private final List<GeometryModel> geometries; + private final boolean hasZ; + private final boolean hasM; + + GeometryCollection(List<GeometryModel> geometries, int srid, boolean hasZ, boolean hasM) { + super(GeoTypeId.GEOMETRY_COLLECTION, srid); + this.geometries = geometries; + this.hasZ = hasZ; + this.hasM = hasM; + } + + List<GeometryModel> getGeometries() { + return geometries; + } + + int getNumGeometries() { + return geometries.size(); + } + + @Override + boolean isEmpty() { + return geometries.isEmpty() || geometries.stream().allMatch(GeometryModel::isEmpty); + } + + @Override + int getDimensionCount() { + return 2 + (hasZ ? 1 : 0) + (hasM ? 1 : 0); + } + + @Override + boolean hasZ() { + return hasZ; + } + + @Override + boolean hasM() { + return hasM; + } + + @Override + public String toString() { + if (isEmpty()) { + return "GEOMETRYCOLLECTION EMPTY"; + } + return "GEOMETRYCOLLECTION (" + geometries.size() + " geometries)"; + } Review Comment: If we do need it, do we also guarantee full WKT format compliance? If so, let's consider delegating `toString()` to a method such as `toWkt()` instead of writing code directly in `toString`. If we don't want to explicitly expose it, we can keep that method private too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
