[ 
https://issues.apache.org/jira/browse/LIVY-720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16993294#comment-16993294
 ] 

Stephen Jenkins commented on LIVY-720:
--------------------------------------

Hi [~runzhiwang]. Thanks for looking into the issue. Apologies, I was messing 
around with scala versions while testing this and had left it at version 
2.11.8, however, I had tried it at 2.11.12 with no luck.

I noticed in your attached code that the simplified example is not reading from 
hdfs. This part is key:


val csv: DataFrame = ss
  .read
  .option("header", "true")
  .option("mode", "DROPMALFORMED")
  .csv(filePath)
 

As it's where the exception is being thrown. Here is where it originates from: 
[https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L45]

 

> NoSuchElementException caused when reading from hdfs  submitted via livy 
> programmatic api
> -----------------------------------------------------------------------------------------
>
>                 Key: LIVY-720
>                 URL: https://issues.apache.org/jira/browse/LIVY-720
>             Project: Livy
>          Issue Type: Bug
>          Components: RSC
>    Affects Versions: 0.6.0
>         Environment: Using a docker container on windows 10: 
> https://hub.docker.com/r/cheathwood/hadoop-spark-livy
>            Reporter: Stephen Jenkins
>            Priority: Blocker
>         Attachments: FailingLivySparkJob.zip
>
>
> Hi,
>  
> I've been using the Livy programmatic api to submit spark jobs written in 
> scala and I've ran into a strange issue. I'm using case classes to wrap the 
> parameters I want to send over to spark, then within the job I manipulate 
> them to be used for different parts of the job. However, it seems whenever I 
> try read and collect data from hdfs I get the following error:
> {code:java}
> java.util.NoSuchElementException: head of empty list
>       at scala.collection.immutable.Nil$.head(List.scala:420)
>       at scala.collection.immutable.Nil$.head(List.scala:417)
>       at scala.collection.immutable.List.map(List.scala:277)
>       at 
> scala.reflect.internal.Symbols$Symbol.parentSymbols(Symbols.scala:2117)
>       at 
> scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:301)
>       at 
> scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:341)
>       at 
> scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply$mcV$sp(SymbolLoaders.scala:74)
>       at 
> scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
>       at 
> scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
>       at 
> scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:263)
>       at 
> scala.reflect.runtime.SymbolLoaders$LazyPackageType.complete(SymbolLoaders.scala:71)
>       at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
>       at 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:174)
>       at 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
>       at 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
>       at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
>       at 
> scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
>       at 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
>       at 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:174)
>       at 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
>       at 
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.info(SynchronizedSymbols.scala:174)
>       at scala.reflect.internal.Types$TypeRef.thisInfo(Types.scala:2194)
>       at scala.reflect.internal.Types$TypeRef.baseClasses(Types.scala:2199)
>       at 
> scala.reflect.internal.tpe.FindMembers$FindMemberBase.<init>(FindMembers.scala:17)
>       at 
> scala.reflect.internal.tpe.FindMembers$FindMember.<init>(FindMembers.scala:219)
>       at 
> scala.reflect.internal.Types$Type.scala$reflect$internal$Types$Type$$findMemberInternal$1(Types.scala:1014)
>       at scala.reflect.internal.Types$Type.findMember(Types.scala:1016)
>       at scala.reflect.internal.Types$Type.memberBasedOnName(Types.scala:631)
>       at scala.reflect.internal.Types$Type.member(Types.scala:600)
>       at 
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
>       at 
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:66)
>       at 
> scala.reflect.internal.Mirrors$RootsBase.staticPackage(Mirrors.scala:204)
>       at 
> scala.reflect.runtime.JavaMirrors$JavaMirror.staticPackage(JavaMirrors.scala:82)
>       at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:263)
>       at 
> scala.reflect.runtime.JavaMirrors$class.scala$reflect$runtime$JavaMirrors$$createMirror(JavaMirrors.scala:32)
>       at 
> scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:49)
>       at 
> scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:47)
>       at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
>       at 
> scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
>       at 
> scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:46)
>       at 
> scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
>       at 
> scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
>       at 
> org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:45)
>       at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:48)
>       at org.apache.spark.sql.Encoders$.STRING(Encoders.scala:96)
>       at 
> org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.createBaseDataset(CSVDataSource.scala:189)
>       at 
> org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:147)
>       at 
> org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:63)
>       at 
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:57)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:201)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:392)
>       at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
>       at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
>       at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:594)
>       at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:473)
>       at test.FailingJob.theStuffToDo(FailingJob.scala:28)
>       at test.FailingJob.call(FailingJob.scala:11)
>       at 
> test.Service$$anonfun$testProvider$2$$anonfun$apply$1.apply(Service.scala:22)
>       at 
> test.Service$$anonfun$testProvider$2$$anonfun$apply$1.apply(Service.scala:22)
>       at 
> org.apache.livy.scalaapi.LivyScalaClient$$anon$1.call(LivyScalaClient.scala:54)
>       at org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:40)
>       at org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27)
>       at org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:64)
>       at 
> org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:45)
>       at 
> org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:27)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
>   
> As you can see, it's a Scala Reflection error and seems to fail when trying 
> to get class definitions, which makes me think that it has something to do 
> with the kryo serialization/deserialization. I've managed to isolate the 
> issue and reproduce it in a small 
> [example|[https://github.com/steviej08/FailingLivySparkJob]]
> It doesn't matter what I do with the array within the function it seems, but 
> it must be used. As soon the job reads the csv it fails. The key thing is 
> `someSeq` is of type `Seq[String]`. It seems to work when I pass it in as an 
> `Array[String]`. If I do not reference the param, but still pass the param 
> in, it does not fail.
> {code:java}
> val mappedArray = someSeq.map(s => s.toUpperCase())
> val ss: SparkSession = scalaJobContext.sparkSession
> val csv: DataFrame = ss
>   .read
>   .option("header", "true")
>   .option("mode", "DROPMALFORMED")
>   .csv(filePath)
> println(csv.count())
> {code}
> I appreciate that the exception is being thrown within spark, but I can't 
> seem to reproduce it without using the Rpc Livy programmatic api. 
> Other things to note. I am using spark version `2.3`, however, I have tried 
> to upgrade spark to `v2.4` with no luck. I have also tried to generate a 
> function from an object, without using a case class with no luck. I have 
> tried to reference the value as a property on an object, which did work. This 
> is no good for me though, as I need (well would much prefer) to pass in a 
> parameter.
> I know that I can just use an Array, but it means I'd need to transform all 
> my objects for transmission which is a bit of a pain. Also, I am struggling 
> to do this transformation in production with it working within my small 
> example, but am working on this currently.
>  
> Any help or insight on this would be amazing.
>  
> Thanks,
> Stephen
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to