Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some known race conditions in reflection and the Scala community doesn't have plan to fix it ( http://docs.scala-lang.org/overviews/reflection/thread-safety.html) AFAIK, the only way to fix it is upgrading to Scala 2.11.
On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <deshpandesh...@gmail.com> wrote: > I am using protobuf to encode. This may not be related to the new release > issue.... > > Exception in thread "main" scala.ScalaReflectionException: <none> is not > a term > at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199) > at scala.reflect.internal.Symbols$SymbolContextApiImpl. > asTerm(Symbols.scala:84) > at org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams( > ScalaReflection.scala:811) > at org.apache.spark.sql.catalyst.ScalaReflection$.constructParams( > ScalaReflection.scala:39) > at org.apache.spark.sql.catalyst.ScalaReflection$class. > getConstructorParameters(ScalaReflection.scala:800) > at org.apache.spark.sql.catalyst.ScalaReflection$. > getConstructorParameters(ScalaReflection.scala:39) > at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$ > spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection. > scala:582) > at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$ > spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection. > scala:460) > at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9. > apply(ScalaReflection.scala:592) > at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9. > apply(ScalaReflection.scala:583) > at scala.collection.TraversableLike$$anonfun$flatMap$1.apply( > TraversableLike.scala:252) > at scala.collection.TraversableLike$$anonfun$flatMap$1.apply( > TraversableLike.scala:252) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.flatMap( > TraversableLike.scala:252) > at scala.collection.immutable.List.flatMap(List.scala:344) > at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$ > spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection. > scala:583) > at org.apache.spark.sql.catalyst.ScalaReflection$. > serializerFor(ScalaReflection.scala:425) > at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$. > apply(ExpressionEncoder.scala:61) > at org.apache.spark.sql.Encoders$.product(Encoders.scala:274) > at org.apache.spark.sql.SQLImplicits.newProductEncoder( > SQLImplicits.scala:47) > at PersonConsumer$.main(PersonConsumer.scala:33) > at PersonConsumer.main(PersonConsumer.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) > > The following is my code ... > > object PersonConsumer { > import org.apache.spark.rdd.RDD > import com.trueaccord.scalapb.spark._ > import org.apache.spark.sql.{SQLContext, SparkSession} > import com.example.protos.demo._ > > def main(args : Array[String]) { > > def parseLine(s: String): Person = > Person.parseFrom( > org.apache.commons.codec.binary.Base64.decodeBase64(s)) > > val spark = SparkSession.builder. > master("local") > .appName("spark session example") > .getOrCreate() > > import spark.implicits._ > > val ds1 = > spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load() > > val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String] > > val ds3 = ds2.map(str => > parseLine(str)).createOrReplaceTempView("persons") > > val ds4 = spark.sqlContext.sql("select name from persons") > > val query = ds4.writeStream > .outputMode("append") > .format("console") > .start() > query.awaitTermination() > } > } > >