[jira] [Updated] (SPARK-38091) AvroSerializer can cause java.lang.ClassCastException at run time

2022-02-03 Thread Zhenhao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenhao Li updated SPARK-38091:
---
Description: 
{{{}AvroSerializer{}}}'s implementation, at least in {{{}newConverter{}}}, was 
not 100% based on the {{nternalRow}} and {{SpecializedGetters}} interface. It 
assumes many implementation details of the interface. 

For example, in 

{code}
      case (TimestampType, LONG) => avroType.getLogicalType match {
          // For backward compatibility, if the Avro type is Long and it is not 
logical type
          // (the `null` case), output the timestamp value as with millisecond 
precision.
          case null | _: TimestampMillis => (getter, ordinal) =>
            
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
          case _: TimestampMicros => (getter, ordinal) =>
            timestampRebaseFunc(getter.getLong(ordinal))
          case other => throw new IncompatibleSchemaException(errorPrefix +
            s"SQL type ${TimestampType.sql} cannot be converted to Avro logical 
type $other")
        }
{code}

it assumes the {{InternalRow}} instance encodes {{TimestampType}} as 
{{{}java.lang.Long{}}}. That's true for {{Unsaferow}} but not for 
{{{}GenericInternalRow{}}}. 

Hence the above code will end up with runtime exceptions when used on an 
instance of {{{}GenericInternalRow{}}}, which is the case for Python UDF. 

I didn't get time to dig deeper than that. I got the impression that Spark's 
optimizer(s) will turn a row into a {{UnsafeRow}} and Python UDF doesn't 
involve the optimizer(s) and hence each row is a {{{}GenericInternalRow{}}}. 

It would be great if someone can correct me or offer a better explanation. 

 

To reproduce the issue, 

{{git checkout master}} and {{git cherry-pick --no-commit}} 
[this-commit|https://github.com/Zhen-hao/spark/commit/1ffe8e8f35273b2f3529f6c2d004822f480e4c88]

and run the test {{{}org.apache.spark.sql.avro.AvroSerdeSuite{}}}.

 

You will see runtime exceptions like the following one

\\{code}

Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED ***
  java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module 
java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in 
unnamed module of loader 'app')
  at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45)
  at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45)
  at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195)
  at 
org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136)
  at 
org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135)
  at 
org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283)
  at org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60)
  at 
org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82)
  at 
org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67)
  at 
org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217)
\\{code}

  was:
`AvroSerializer`'s implementation, at least in `newConverter`, was not 100% 
based on the `InternalRow` and `SpecializedGetters` interface. It assumes many 
implementation details of the interface. 

For example, in 

```scala
      case (TimestampType, LONG) => avroType.getLogicalType match {
          // For backward compatibility, if the Avro type is Long and it is not 
logical type
          // (the `null` case), output the timestamp value as with millisecond 
precision.
          case null | _: TimestampMillis => (getter, ordinal) =>
            
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
          case _: TimestampMicros => (getter, ordinal) =>
            timestampRebaseFunc(getter.getLong(ordinal))
          case other => throw new IncompatibleSchemaException(errorPrefix +
            s"SQL type ${TimestampType.sql} cannot be converted to Avro logical 
type $other")
        }
```

it assumes the `InternalRow` instance encodes `TimestampType` as 
`java.lang.Long`. That's true for `Unsaferow` but not for `GenericInternalRow`. 

Hence the above code will end up with runtime exceptions when used on an 
instance of `GenericInternalRow`, which is the case for Python UDF. 

I didn't get time to dig deeper than that. I got the impression that Spark's 
optimizer(s) will turn a row into a `UnsafeRow` and Python UDF doesn't involve 
the optimizer(s) and hence each row is a `GenericInternalRow`. 

It would be great if someone can correct me or offer a better explanation. 

 

To reproduce the issue, 

`git checkout 

[jira] [Commented] (SPARK-38091) AvroSerializer can cause java.lang.ClassCastException at run time

2022-02-02 Thread Zhenhao Li (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486036#comment-17486036
 ] 

Zhenhao Li commented on SPARK-38091:


Can someone tell me how to let Jira render markdown? 

> AvroSerializer can cause java.lang.ClassCastException at run time
> -
>
> Key: SPARK-38091
> URL: https://issues.apache.org/jira/browse/SPARK-38091
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.0.3, 3.1.0, 3.1.1, 3.1.2, 3.2.0, 
> 3.2.1
>Reporter: Zhenhao Li
>Priority: Major
>  Labels: Avro, serializers
>
> `AvroSerializer`'s implementation, at least in `newConverter`, was not 100% 
> based on the `InternalRow` and `SpecializedGetters` interface. It assumes 
> many implementation details of the interface. 
> For example, in 
> ```scala
>       case (TimestampType, LONG) => avroType.getLogicalType match {
>           // For backward compatibility, if the Avro type is Long and it is 
> not logical type
>           // (the `null` case), output the timestamp value as with 
> millisecond precision.
>           case null | _: TimestampMillis => (getter, ordinal) =>
>             
> DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
>           case _: TimestampMicros => (getter, ordinal) =>
>             timestampRebaseFunc(getter.getLong(ordinal))
>           case other => throw new IncompatibleSchemaException(errorPrefix +
>             s"SQL type ${TimestampType.sql} cannot be converted to Avro 
> logical type $other")
>         }
> ```
> it assumes the `InternalRow` instance encodes `TimestampType` as 
> `java.lang.Long`. That's true for `Unsaferow` but not for 
> `GenericInternalRow`. 
> Hence the above code will end up with runtime exceptions when used on an 
> instance of `GenericInternalRow`, which is the case for Python UDF. 
> I didn't get time to dig deeper than that. I got the impression that Spark's 
> optimizer(s) will turn a row into a `UnsafeRow` and Python UDF doesn't 
> involve the optimizer(s) and hence each row is a `GenericInternalRow`. 
> It would be great if someone can correct me or offer a better explanation. 
>  
> To reproduce the issue, 
> `git checkout master` and `git cherry-pick --no-commit` 
> [this-commit|https://github.com/Zhen-hao/spark/commit/1ffe8e8f35273b2f3529f6c2d004822f480e4c88]
> and run the test `org.apache.spark.sql.avro.AvroSerdeSuite`.
>  
> You will see runtime exceptions like the following one
> ```
> - Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED ***
>   java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
> class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module 
> java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in 
> unnamed module of loader 'app')
>   at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45)
>   at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45)
>   at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195)
>   at 
> org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136)
>   at 
> org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135)
>   at 
> org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283)
>   at 
> org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60)
>   at 
> org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82)
>   at 
> org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67)
>   at 
> org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217)
> ```



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38091) AvroSerializer can cause java.lang.ClassCastException at run time

2022-02-02 Thread Zhenhao Li (Jira)
Zhenhao Li created SPARK-38091:
--

 Summary: AvroSerializer can cause java.lang.ClassCastException at 
run time
 Key: SPARK-38091
 URL: https://issues.apache.org/jira/browse/SPARK-38091
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.1, 3.2.0, 3.1.2, 3.1.1, 3.1.0, 3.0.3, 3.0.2, 3.0.1, 
3.0.0
Reporter: Zhenhao Li


`AvroSerializer`'s implementation, at least in `newConverter`, was not 100% 
based on the `InternalRow` and `SpecializedGetters` interface. It assumes many 
implementation details of the interface. 

For example, in 

```scala
      case (TimestampType, LONG) => avroType.getLogicalType match {
          // For backward compatibility, if the Avro type is Long and it is not 
logical type
          // (the `null` case), output the timestamp value as with millisecond 
precision.
          case null | _: TimestampMillis => (getter, ordinal) =>
            
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
          case _: TimestampMicros => (getter, ordinal) =>
            timestampRebaseFunc(getter.getLong(ordinal))
          case other => throw new IncompatibleSchemaException(errorPrefix +
            s"SQL type ${TimestampType.sql} cannot be converted to Avro logical 
type $other")
        }
```

it assumes the `InternalRow` instance encodes `TimestampType` as 
`java.lang.Long`. That's true for `Unsaferow` but not for `GenericInternalRow`. 

Hence the above code will end up with runtime exceptions when used on an 
instance of `GenericInternalRow`, which is the case for Python UDF. 

I didn't get time to dig deeper than that. I got the impression that Spark's 
optimizer(s) will turn a row into a `UnsafeRow` and Python UDF doesn't involve 
the optimizer(s) and hence each row is a `GenericInternalRow`. 

It would be great if someone can correct me or offer a better explanation. 

 

To reproduce the issue, 

`git checkout master` and `git cherry-pick --no-commit` 
[this-commit|https://github.com/Zhen-hao/spark/commit/1ffe8e8f35273b2f3529f6c2d004822f480e4c88]

and run the test `org.apache.spark.sql.avro.AvroSerdeSuite`.

 

You will see runtime exceptions like the following one

```

- Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED ***
  java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module 
java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in 
unnamed module of loader 'app')
  at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45)
  at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45)
  at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195)
  at 
org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136)
  at 
org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135)
  at 
org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283)
  at org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60)
  at 
org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82)
  at 
org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67)
  at 
org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217)
```



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25315) setting "auto.offset.reset" to "earliest" has no effect in Structured Streaming with Spark 2.3.1 and Kafka 1.0

2018-09-03 Thread Zhenhao Li (JIRA)
Zhenhao Li created SPARK-25315:
--

 Summary: setting "auto.offset.reset" to "earliest" has no effect 
in Structured Streaming with Spark 2.3.1 and Kafka 1.0
 Key: SPARK-25315
 URL: https://issues.apache.org/jira/browse/SPARK-25315
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.3.1
 Environment: Standalone; running in IDEA
Reporter: Zhenhao Li


The following code won't read from the beginning of the topic

```

{code:java}
val kafkaOptions = Map[String, String](
 "kafka.bootstrap.servers" -> KAFKA_BOOTSTRAP_SERVERS,
 "subscribe" -> TOPIC,
 "group.id" -> GROUP_ID,
 "auto.offset.reset" -> "earliest"
)

val myStream = sparkSession
.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

  myStream
.writeStream
.format("console")
.start()
.awaitTermination()
{code}

```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org