Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
known race conditions in reflection and the Scala community doesn't have
plan to fix it (
http://docs.scala-lang.org/overviews/reflection/thread-safety.html) AFAIK,
the only way to fix it is upgrading to Scala 2.11.

On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> I am using protobuf to encode. This may not be related to the new release
> issue....
>
> Exception in thread "main" scala.ScalaReflectionException: <none> is not
> a term
> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
> at scala.reflect.internal.Symbols$SymbolContextApiImpl.
> asTerm(Symbols.scala:84)
> at org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(
> ScalaReflection.scala:811)
> at org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(
> ScalaReflection.scala:39)
> at org.apache.spark.sql.catalyst.ScalaReflection$class.
> getConstructorParameters(ScalaReflection.scala:800)
> at org.apache.spark.sql.catalyst.ScalaReflection$.
> getConstructorParameters(ScalaReflection.scala:39)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:582)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:460)
> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.
> apply(ScalaReflection.scala:592)
> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.
> apply(ScalaReflection.scala:583)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:252)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:252)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.flatMap(
> TraversableLike.scala:252)
> at scala.collection.immutable.List.flatMap(List.scala:344)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:583)
> at org.apache.spark.sql.catalyst.ScalaReflection$.
> serializerFor(ScalaReflection.scala:425)
> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.
> apply(ExpressionEncoder.scala:61)
> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
> at org.apache.spark.sql.SQLImplicits.newProductEncoder(
> SQLImplicits.scala:47)
> at PersonConsumer$.main(PersonConsumer.scala:33)
> at PersonConsumer.main(PersonConsumer.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>
> The following is my code ...
>
> object PersonConsumer {
>   import org.apache.spark.rdd.RDD
>   import com.trueaccord.scalapb.spark._
>   import org.apache.spark.sql.{SQLContext, SparkSession}
>   import com.example.protos.demo._
>
>   def main(args : Array[String]) {
>
>     def parseLine(s: String): Person =
>       Person.parseFrom(
>         org.apache.commons.codec.binary.Base64.decodeBase64(s))
>
>     val spark = SparkSession.builder.
>       master("local")
>       .appName("spark session example")
>       .getOrCreate()
>
>     import spark.implicits._
>
>     val ds1 = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>
>     val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>
>     val ds3 = ds2.map(str => 
> parseLine(str)).createOrReplaceTempView("persons")
>
>     val ds4 = spark.sqlContext.sql("select name from persons")
>
>     val query = ds4.writeStream
>       .outputMode("append")
>       .format("console")
>       .start()
>     query.awaitTermination()
>   }
> }
>
>

Reply via email to