[jira] [Updated] (SPARK-38091) AvroSerializer can cause java.lang.ClassCastException at run time
[ https://issues.apache.org/jira/browse/SPARK-38091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenhao Li updated SPARK-38091: --- Description: {{{}AvroSerializer{}}}'s implementation, at least in {{{}newConverter{}}}, was not 100% based on the {{nternalRow}} and {{SpecializedGetters}} interface. It assumes many implementation details of the interface. For example, in {code} case (TimestampType, LONG) => avroType.getLogicalType match { // For backward compatibility, if the Avro type is Long and it is not logical type // (the `null` case), output the timestamp value as with millisecond precision. case null | _: TimestampMillis => (getter, ordinal) => DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal))) case _: TimestampMicros => (getter, ordinal) => timestampRebaseFunc(getter.getLong(ordinal)) case other => throw new IncompatibleSchemaException(errorPrefix + s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other") } {code} it assumes the {{InternalRow}} instance encodes {{TimestampType}} as {{{}java.lang.Long{}}}. That's true for {{Unsaferow}} but not for {{{}GenericInternalRow{}}}. Hence the above code will end up with runtime exceptions when used on an instance of {{{}GenericInternalRow{}}}, which is the case for Python UDF. I didn't get time to dig deeper than that. I got the impression that Spark's optimizer(s) will turn a row into a {{UnsafeRow}} and Python UDF doesn't involve the optimizer(s) and hence each row is a {{{}GenericInternalRow{}}}. It would be great if someone can correct me or offer a better explanation. To reproduce the issue, {{git checkout master}} and {{git cherry-pick --no-commit}} [this-commit|https://github.com/Zhen-hao/spark/commit/1ffe8e8f35273b2f3529f6c2d004822f480e4c88] and run the test {{{}org.apache.spark.sql.avro.AvroSerdeSuite{}}}. You will see runtime exceptions like the following one \\{code} Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED *** java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283) at org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60) at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82) at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67) at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217) \\{code} was: `AvroSerializer`'s implementation, at least in `newConverter`, was not 100% based on the `InternalRow` and `SpecializedGetters` interface. It assumes many implementation details of the interface. For example, in ```scala case (TimestampType, LONG) => avroType.getLogicalType match { // For backward compatibility, if the Avro type is Long and it is not logical type // (the `null` case), output the timestamp value as with millisecond precision. case null | _: TimestampMillis => (getter, ordinal) => DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal))) case _: TimestampMicros => (getter, ordinal) => timestampRebaseFunc(getter.getLong(ordinal)) case other => throw new IncompatibleSchemaException(errorPrefix + s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other") } ``` it assumes the `InternalRow` instance encodes `TimestampType` as `java.lang.Long`. That's true for `Unsaferow` but not for `GenericInternalRow`. Hence the above code will end up with runtime exceptions when used on an instance of `GenericInternalRow`, which is the case for Python UDF. I didn't get time to dig deeper than that. I got the impression that Spark's optimizer(s) will turn a row into a `UnsafeRow` and Python UDF doesn't involve the optimizer(s) and hence each row is a `GenericInternalRow`. It would be great if someone can correct me or offer a better explanation. To reproduce the issue, `git checkout
[jira] [Commented] (SPARK-38091) AvroSerializer can cause java.lang.ClassCastException at run time
[ https://issues.apache.org/jira/browse/SPARK-38091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486036#comment-17486036 ] Zhenhao Li commented on SPARK-38091: Can someone tell me how to let Jira render markdown? > AvroSerializer can cause java.lang.ClassCastException at run time > - > > Key: SPARK-38091 > URL: https://issues.apache.org/jira/browse/SPARK-38091 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.0.3, 3.1.0, 3.1.1, 3.1.2, 3.2.0, > 3.2.1 >Reporter: Zhenhao Li >Priority: Major > Labels: Avro, serializers > > `AvroSerializer`'s implementation, at least in `newConverter`, was not 100% > based on the `InternalRow` and `SpecializedGetters` interface. It assumes > many implementation details of the interface. > For example, in > ```scala > case (TimestampType, LONG) => avroType.getLogicalType match { > // For backward compatibility, if the Avro type is Long and it is > not logical type > // (the `null` case), output the timestamp value as with > millisecond precision. > case null | _: TimestampMillis => (getter, ordinal) => > > DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal))) > case _: TimestampMicros => (getter, ordinal) => > timestampRebaseFunc(getter.getLong(ordinal)) > case other => throw new IncompatibleSchemaException(errorPrefix + > s"SQL type ${TimestampType.sql} cannot be converted to Avro > logical type $other") > } > ``` > it assumes the `InternalRow` instance encodes `TimestampType` as > `java.lang.Long`. That's true for `Unsaferow` but not for > `GenericInternalRow`. > Hence the above code will end up with runtime exceptions when used on an > instance of `GenericInternalRow`, which is the case for Python UDF. > I didn't get time to dig deeper than that. I got the impression that Spark's > optimizer(s) will turn a row into a `UnsafeRow` and Python UDF doesn't > involve the optimizer(s) and hence each row is a `GenericInternalRow`. > It would be great if someone can correct me or offer a better explanation. > > To reproduce the issue, > `git checkout master` and `git cherry-pick --no-commit` > [this-commit|https://github.com/Zhen-hao/spark/commit/1ffe8e8f35273b2f3529f6c2d004822f480e4c88] > and run the test `org.apache.spark.sql.avro.AvroSerdeSuite`. > > You will see runtime exceptions like the following one > ``` > - Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED *** > java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to > class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module > java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in > unnamed module of loader 'app') > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45) > at > org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195) > at > org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136) > at > org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135) > at > org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283) > at > org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60) > at > org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82) > at > org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67) > at > org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217) > ``` -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38091) AvroSerializer can cause java.lang.ClassCastException at run time
Zhenhao Li created SPARK-38091: -- Summary: AvroSerializer can cause java.lang.ClassCastException at run time Key: SPARK-38091 URL: https://issues.apache.org/jira/browse/SPARK-38091 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.1, 3.2.0, 3.1.2, 3.1.1, 3.1.0, 3.0.3, 3.0.2, 3.0.1, 3.0.0 Reporter: Zhenhao Li `AvroSerializer`'s implementation, at least in `newConverter`, was not 100% based on the `InternalRow` and `SpecializedGetters` interface. It assumes many implementation details of the interface. For example, in ```scala case (TimestampType, LONG) => avroType.getLogicalType match { // For backward compatibility, if the Avro type is Long and it is not logical type // (the `null` case), output the timestamp value as with millisecond precision. case null | _: TimestampMillis => (getter, ordinal) => DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal))) case _: TimestampMicros => (getter, ordinal) => timestampRebaseFunc(getter.getLong(ordinal)) case other => throw new IncompatibleSchemaException(errorPrefix + s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other") } ``` it assumes the `InternalRow` instance encodes `TimestampType` as `java.lang.Long`. That's true for `Unsaferow` but not for `GenericInternalRow`. Hence the above code will end up with runtime exceptions when used on an instance of `GenericInternalRow`, which is the case for Python UDF. I didn't get time to dig deeper than that. I got the impression that Spark's optimizer(s) will turn a row into a `UnsafeRow` and Python UDF doesn't involve the optimizer(s) and hence each row is a `GenericInternalRow`. It would be great if someone can correct me or offer a better explanation. To reproduce the issue, `git checkout master` and `git cherry-pick --no-commit` [this-commit|https://github.com/Zhen-hao/spark/commit/1ffe8e8f35273b2f3529f6c2d004822f480e4c88] and run the test `org.apache.spark.sql.avro.AvroSerdeSuite`. You will see runtime exceptions like the following one ``` - Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED *** java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283) at org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60) at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82) at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67) at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217) ``` -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25315) setting "auto.offset.reset" to "earliest" has no effect in Structured Streaming with Spark 2.3.1 and Kafka 1.0
Zhenhao Li created SPARK-25315: -- Summary: setting "auto.offset.reset" to "earliest" has no effect in Structured Streaming with Spark 2.3.1 and Kafka 1.0 Key: SPARK-25315 URL: https://issues.apache.org/jira/browse/SPARK-25315 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.3.1 Environment: Standalone; running in IDEA Reporter: Zhenhao Li The following code won't read from the beginning of the topic ``` {code:java} val kafkaOptions = Map[String, String]( "kafka.bootstrap.servers" -> KAFKA_BOOTSTRAP_SERVERS, "subscribe" -> TOPIC, "group.id" -> GROUP_ID, "auto.offset.reset" -> "earliest" ) val myStream = sparkSession .readStream .format("kafka") .options(kafkaOptions) .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") myStream .writeStream .format("console") .start() .awaitTermination() {code} ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org