Re: [PR] [SPARK-47803][SQL] Support cast to variant. [spark]
cloud-fan closed pull request #45989: [SPARK-47803][SQL] Support cast to variant. URL: https://github.com/apache/spark/pull/45989 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47803][SQL] Support cast to variant. [spark]
cloud-fan commented on PR #45989: URL: https://github.com/apache/spark/pull/45989#issuecomment-2055240421 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47803][SQL] Support cast to variant. [spark]
cloud-fan commented on code in PR #45989: URL: https://github.com/apache/spark/pull/45989#discussion_r1565188338 ## common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java: ## @@ -100,6 +105,272 @@ private Variant result() { return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); } + public void appendString(String str) { +byte[] text = str.getBytes(StandardCharsets.UTF_8); +boolean longStr = text.length > MAX_SHORT_STR_SIZE; +checkCapacity((longStr ? 1 + U32_SIZE : 1) + text.length); +if (longStr) { + writeBuffer[writePos++] = primitiveHeader(LONG_STR); + writeLong(writeBuffer, writePos, text.length, U32_SIZE); + writePos += U32_SIZE; +} else { + writeBuffer[writePos++] = shortStrHeader(text.length); +} +System.arraycopy(text, 0, writeBuffer, writePos, text.length); +writePos += text.length; + } + + public void appendNull() { +checkCapacity(1); +writeBuffer[writePos++] = primitiveHeader(NULL); + } + + public void appendBoolean(boolean b) { +checkCapacity(1); +writeBuffer[writePos++] = primitiveHeader(b ? TRUE : FALSE); + } + + // Append a long value to the variant builder. The actual used integer type depends on the value + // range of the long value. + public void appendLong(long l) { +checkCapacity(1 + 8); +if (l == (byte) l) { + writeBuffer[writePos++] = primitiveHeader(INT1); + writeLong(writeBuffer, writePos, l, 1); + writePos += 1; +} else if (l == (short) l) { + writeBuffer[writePos++] = primitiveHeader(INT2); + writeLong(writeBuffer, writePos, l, 2); + writePos += 2; +} else if (l == (int) l) { + writeBuffer[writePos++] = primitiveHeader(INT4); + writeLong(writeBuffer, writePos, l, 4); + writePos += 4; +} else { + writeBuffer[writePos++] = primitiveHeader(INT8); + writeLong(writeBuffer, writePos, l, 8); + writePos += 8; +} + } + + public void appendDouble(double d) { +checkCapacity(1 + 8); +writeBuffer[writePos++] = primitiveHeader(DOUBLE); +writeLong(writeBuffer, writePos, Double.doubleToLongBits(d), 8); +writePos += 8; + } + + // Append a decimal value to the variant builder. The caller should guarantee that its precision + // and scale fit into `MAX_DECIMAL16_PRECISION`. + public void appendDecimal(BigDecimal d) { +checkCapacity(2 + 16); +BigInteger unscaled = d.unscaledValue(); +if (d.scale() <= MAX_DECIMAL4_PRECISION && d.precision() <= MAX_DECIMAL4_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL4); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.intValueExact(), 4); + writePos += 4; +} else if (d.scale() <= MAX_DECIMAL8_PRECISION && d.precision() <= MAX_DECIMAL8_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL8); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.longValueExact(), 8); + writePos += 8; +} else { + assert d.scale() <= MAX_DECIMAL16_PRECISION && d.precision() <= MAX_DECIMAL16_PRECISION; + writeBuffer[writePos++] = primitiveHeader(DECIMAL16); + writeBuffer[writePos++] = (byte) d.scale(); + // `toByteArray` returns a big-endian representation. We need to copy it reversely and sign + // extend it to 16 bytes. + byte[] bytes = unscaled.toByteArray(); + for (int i = 0; i < bytes.length; ++i) { +writeBuffer[writePos + i] = bytes[bytes.length - 1 - i]; + } + byte sign = (byte) (bytes[0] < 0 ? -1 : 0); + for (int i = bytes.length; i < 16; ++i) { +writeBuffer[writePos + i] = sign; + } + writePos += 16; +} + } + + public void appendDate(int daysSinceEpoch) { Review Comment: Oh I see, this write method writes metadata as well `writeBuffer[writePos++] = primitiveHeader(DATE);`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47803][SQL] Support cast to variant. [spark]
chenhao-db commented on code in PR #45989: URL: https://github.com/apache/spark/pull/45989#discussion_r1565141590 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala: ## @@ -41,4 +43,69 @@ object VariantExpressionEvalUtils { input.toString, BadRecordException(() => input, cause = e)) } } + + /** Cast a Spark value from `dataType` into the variant type. */ + def castToVariant(input: Any, dataType: DataType): VariantVal = { Review Comment: I think it is doable but involves quite a lot of boilerplate code. I would prefer to have the optimization in a separate follow-up PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47803][SQL] Support cast to variant. [spark]
chenhao-db commented on code in PR #45989: URL: https://github.com/apache/spark/pull/45989#discussion_r1565141311 ## common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java: ## @@ -100,6 +105,272 @@ private Variant result() { return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); } + public void appendString(String str) { +byte[] text = str.getBytes(StandardCharsets.UTF_8); +boolean longStr = text.length > MAX_SHORT_STR_SIZE; +checkCapacity((longStr ? 1 + U32_SIZE : 1) + text.length); +if (longStr) { + writeBuffer[writePos++] = primitiveHeader(LONG_STR); + writeLong(writeBuffer, writePos, text.length, U32_SIZE); + writePos += U32_SIZE; +} else { + writeBuffer[writePos++] = shortStrHeader(text.length); +} +System.arraycopy(text, 0, writeBuffer, writePos, text.length); +writePos += text.length; + } + + public void appendNull() { +checkCapacity(1); +writeBuffer[writePos++] = primitiveHeader(NULL); + } + + public void appendBoolean(boolean b) { +checkCapacity(1); +writeBuffer[writePos++] = primitiveHeader(b ? TRUE : FALSE); + } + + // Append a long value to the variant builder. The actual used integer type depends on the value + // range of the long value. + public void appendLong(long l) { +checkCapacity(1 + 8); +if (l == (byte) l) { + writeBuffer[writePos++] = primitiveHeader(INT1); + writeLong(writeBuffer, writePos, l, 1); + writePos += 1; +} else if (l == (short) l) { + writeBuffer[writePos++] = primitiveHeader(INT2); + writeLong(writeBuffer, writePos, l, 2); + writePos += 2; +} else if (l == (int) l) { + writeBuffer[writePos++] = primitiveHeader(INT4); + writeLong(writeBuffer, writePos, l, 4); + writePos += 4; +} else { + writeBuffer[writePos++] = primitiveHeader(INT8); + writeLong(writeBuffer, writePos, l, 8); + writePos += 8; +} + } + + public void appendDouble(double d) { +checkCapacity(1 + 8); +writeBuffer[writePos++] = primitiveHeader(DOUBLE); +writeLong(writeBuffer, writePos, Double.doubleToLongBits(d), 8); +writePos += 8; + } + + // Append a decimal value to the variant builder. The caller should guarantee that its precision + // and scale fit into `MAX_DECIMAL16_PRECISION`. + public void appendDecimal(BigDecimal d) { +checkCapacity(2 + 16); +BigInteger unscaled = d.unscaledValue(); +if (d.scale() <= MAX_DECIMAL4_PRECISION && d.precision() <= MAX_DECIMAL4_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL4); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.intValueExact(), 4); + writePos += 4; +} else if (d.scale() <= MAX_DECIMAL8_PRECISION && d.precision() <= MAX_DECIMAL8_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL8); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.longValueExact(), 8); + writePos += 8; +} else { + assert d.scale() <= MAX_DECIMAL16_PRECISION && d.precision() <= MAX_DECIMAL16_PRECISION; + writeBuffer[writePos++] = primitiveHeader(DECIMAL16); + writeBuffer[writePos++] = (byte) d.scale(); + // `toByteArray` returns a big-endian representation. We need to copy it reversely and sign + // extend it to 16 bytes. + byte[] bytes = unscaled.toByteArray(); + for (int i = 0; i < bytes.length; ++i) { +writeBuffer[writePos + i] = bytes[bytes.length - 1 - i]; + } + byte sign = (byte) (bytes[0] < 0 ? -1 : 0); + for (int i = bytes.length; i < 16; ++i) { +writeBuffer[writePos + i] = sign; + } + writePos += 16; +} + } + + public void appendDate(int daysSinceEpoch) { Review Comment: We do need it. Suppose there is a date value and an integer value with the same underlying physical integer value, and we want to cast them into the variant type, the resulting variant bytes must be different so that we can know their original data types when we read them out. This can affect the result of subsequent operations, e.g., the `to_json` representations will be different (date vs int format). This is also why we have datetime types as dedicate scalar types in the variant spec. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47803][SQL] Support cast to variant. [spark]
cloud-fan commented on code in PR #45989: URL: https://github.com/apache/spark/pull/45989#discussion_r1565128648 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala: ## @@ -41,4 +43,69 @@ object VariantExpressionEvalUtils { input.toString, BadRecordException(() => input, cause = e)) } } + + /** Cast a Spark value from `dataType` into the variant type. */ + def castToVariant(input: Any, dataType: DataType): VariantVal = { Review Comment: shall we add overload methods for some primitive types like int, long, double to reduce boxing overhead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47803][SQL] Support cast to variant. [spark]
cloud-fan commented on code in PR #45989: URL: https://github.com/apache/spark/pull/45989#discussion_r1565123363 ## common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java: ## @@ -100,6 +105,272 @@ private Variant result() { return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); } + public void appendString(String str) { +byte[] text = str.getBytes(StandardCharsets.UTF_8); +boolean longStr = text.length > MAX_SHORT_STR_SIZE; +checkCapacity((longStr ? 1 + U32_SIZE : 1) + text.length); +if (longStr) { + writeBuffer[writePos++] = primitiveHeader(LONG_STR); + writeLong(writeBuffer, writePos, text.length, U32_SIZE); + writePos += U32_SIZE; +} else { + writeBuffer[writePos++] = shortStrHeader(text.length); +} +System.arraycopy(text, 0, writeBuffer, writePos, text.length); +writePos += text.length; + } + + public void appendNull() { +checkCapacity(1); +writeBuffer[writePos++] = primitiveHeader(NULL); + } + + public void appendBoolean(boolean b) { +checkCapacity(1); +writeBuffer[writePos++] = primitiveHeader(b ? TRUE : FALSE); + } + + // Append a long value to the variant builder. The actual used integer type depends on the value + // range of the long value. + public void appendLong(long l) { +checkCapacity(1 + 8); +if (l == (byte) l) { + writeBuffer[writePos++] = primitiveHeader(INT1); + writeLong(writeBuffer, writePos, l, 1); + writePos += 1; +} else if (l == (short) l) { + writeBuffer[writePos++] = primitiveHeader(INT2); + writeLong(writeBuffer, writePos, l, 2); + writePos += 2; +} else if (l == (int) l) { + writeBuffer[writePos++] = primitiveHeader(INT4); + writeLong(writeBuffer, writePos, l, 4); + writePos += 4; +} else { + writeBuffer[writePos++] = primitiveHeader(INT8); + writeLong(writeBuffer, writePos, l, 8); + writePos += 8; +} + } + + public void appendDouble(double d) { +checkCapacity(1 + 8); +writeBuffer[writePos++] = primitiveHeader(DOUBLE); +writeLong(writeBuffer, writePos, Double.doubleToLongBits(d), 8); +writePos += 8; + } + + // Append a decimal value to the variant builder. The caller should guarantee that its precision + // and scale fit into `MAX_DECIMAL16_PRECISION`. + public void appendDecimal(BigDecimal d) { +checkCapacity(2 + 16); +BigInteger unscaled = d.unscaledValue(); +if (d.scale() <= MAX_DECIMAL4_PRECISION && d.precision() <= MAX_DECIMAL4_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL4); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.intValueExact(), 4); + writePos += 4; +} else if (d.scale() <= MAX_DECIMAL8_PRECISION && d.precision() <= MAX_DECIMAL8_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL8); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.longValueExact(), 8); + writePos += 8; +} else { + assert d.scale() <= MAX_DECIMAL16_PRECISION && d.precision() <= MAX_DECIMAL16_PRECISION; + writeBuffer[writePos++] = primitiveHeader(DECIMAL16); + writeBuffer[writePos++] = (byte) d.scale(); + // `toByteArray` returns a big-endian representation. We need to copy it reversely and sign + // extend it to 16 bytes. + byte[] bytes = unscaled.toByteArray(); + for (int i = 0; i < bytes.length; ++i) { +writeBuffer[writePos + i] = bytes[bytes.length - 1 - i]; + } + byte sign = (byte) (bytes[0] < 0 ? -1 : 0); + for (int i = bytes.length; i < 16; ++i) { +writeBuffer[writePos + i] = sign; + } + writePos += 16; +} + } + + public void appendDate(int daysSinceEpoch) { Review Comment: for example, `PhysicalDataType.apply` returns `PhyiscalIntegerType` for `DateType`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47803][SQL] Support cast to variant. [spark]
cloud-fan commented on code in PR #45989: URL: https://github.com/apache/spark/pull/45989#discussion_r1565122618 ## common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java: ## @@ -100,6 +105,272 @@ private Variant result() { return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); } + public void appendString(String str) { +byte[] text = str.getBytes(StandardCharsets.UTF_8); +boolean longStr = text.length > MAX_SHORT_STR_SIZE; +checkCapacity((longStr ? 1 + U32_SIZE : 1) + text.length); +if (longStr) { + writeBuffer[writePos++] = primitiveHeader(LONG_STR); + writeLong(writeBuffer, writePos, text.length, U32_SIZE); + writePos += U32_SIZE; +} else { + writeBuffer[writePos++] = shortStrHeader(text.length); +} +System.arraycopy(text, 0, writeBuffer, writePos, text.length); +writePos += text.length; + } + + public void appendNull() { +checkCapacity(1); +writeBuffer[writePos++] = primitiveHeader(NULL); + } + + public void appendBoolean(boolean b) { +checkCapacity(1); +writeBuffer[writePos++] = primitiveHeader(b ? TRUE : FALSE); + } + + // Append a long value to the variant builder. The actual used integer type depends on the value + // range of the long value. + public void appendLong(long l) { +checkCapacity(1 + 8); +if (l == (byte) l) { + writeBuffer[writePos++] = primitiveHeader(INT1); + writeLong(writeBuffer, writePos, l, 1); + writePos += 1; +} else if (l == (short) l) { + writeBuffer[writePos++] = primitiveHeader(INT2); + writeLong(writeBuffer, writePos, l, 2); + writePos += 2; +} else if (l == (int) l) { + writeBuffer[writePos++] = primitiveHeader(INT4); + writeLong(writeBuffer, writePos, l, 4); + writePos += 4; +} else { + writeBuffer[writePos++] = primitiveHeader(INT8); + writeLong(writeBuffer, writePos, l, 8); + writePos += 8; +} + } + + public void appendDouble(double d) { +checkCapacity(1 + 8); +writeBuffer[writePos++] = primitiveHeader(DOUBLE); +writeLong(writeBuffer, writePos, Double.doubleToLongBits(d), 8); +writePos += 8; + } + + // Append a decimal value to the variant builder. The caller should guarantee that its precision + // and scale fit into `MAX_DECIMAL16_PRECISION`. + public void appendDecimal(BigDecimal d) { +checkCapacity(2 + 16); +BigInteger unscaled = d.unscaledValue(); +if (d.scale() <= MAX_DECIMAL4_PRECISION && d.precision() <= MAX_DECIMAL4_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL4); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.intValueExact(), 4); + writePos += 4; +} else if (d.scale() <= MAX_DECIMAL8_PRECISION && d.precision() <= MAX_DECIMAL8_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL8); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.longValueExact(), 8); + writePos += 8; +} else { + assert d.scale() <= MAX_DECIMAL16_PRECISION && d.precision() <= MAX_DECIMAL16_PRECISION; + writeBuffer[writePos++] = primitiveHeader(DECIMAL16); + writeBuffer[writePos++] = (byte) d.scale(); + // `toByteArray` returns a big-endian representation. We need to copy it reversely and sign + // extend it to 16 bytes. + byte[] bytes = unscaled.toByteArray(); + for (int i = 0; i < bytes.length; ++i) { +writeBuffer[writePos + i] = bytes[bytes.length - 1 - i]; + } + byte sign = (byte) (bytes[0] < 0 ? -1 : 0); + for (int i = bytes.length; i < 16; ++i) { +writeBuffer[writePos + i] = sign; + } + writePos += 16; +} + } + + public void appendDate(int daysSinceEpoch) { Review Comment: I don't think we need to consider datetime at physical level. They are logical types and physically we just need to operator int32/int64. ## common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java: ## @@ -100,6 +105,272 @@ private Variant result() { return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); } + public void appendString(String str) { +byte[] text = str.getBytes(StandardCharsets.UTF_8); +boolean longStr = text.length > MAX_SHORT_STR_SIZE; +checkCapacity((longStr ? 1 + U32_SIZE : 1) + text.length); +if (longStr) { + writeBuffer[writePos++] = primitiveHeader(LONG_STR); + writeLong(writeBuffer, writePos, text.length, U32_SIZE); + writePos += U32_SIZE; +} else { + writeBuffer[writePos++] = shortStrHeader(text.length); +} +System.arraycopy(text, 0, writeBuffer, writePos, text.length); +writePos += text.length; + } + + public void appendNull() { +checkCapacity(1); +writeBuffer[writePos++] = primitiveHeader(NULL); + } + + public void appendBoolean(boolean b) { +