Run Spark on Java 10

2018-09-28 Thread Ben_W
*my user case*: We run Spark cluster on Mesos, since our Mesos cluster is
also hosting other frameworks such as Storm, Cassandra, we had incidents
where Spark job over-utilizes CPU which caused resource contention with
other frameworks.

*objective* : run un-modularized spark application (jar is compiled with
java 8 compatible sbt compiler) on Java 10 to leverage Java 10 container
support. Related link: https://bugs.openjdk.java.net/browse/JDK-8146115

After reading some readings about Java 9, this is my imaginary *happy-path*:

1) Point JAVA_HOME to Java 10, 
2) Run my spark job, resolve classNotFoundException, lookup the missing
modules in oracle documentation, for example (java.sql
https://docs.oracle.com/javase/9/docs/api/java.sql-summary.html) module is
missing, add “spark.executor.extraJavaOptions --add-modules java.se.ee” ->
conf/spark-defaults.conf 
3) Repeat step 2 until no more exceptions are thrown

however I found this,

Warning: Local jar ***/java.se.ee does not exist, skipping.

/java.lang.ClassNotFoundException: com.**.spark.Main at
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:466)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:566)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:499)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:374)
at org.apache.spark.util.Utils$.classForName(Utils.scala:233)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:732)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)/


This is what I have observed after turning verbose mode

...

Using properties file:
/tmp/spark-2.2.2-bin-hadoop2.6/conf/spark-defaults.conf

Adding default property: spark.eventLog.enabled=true Adding default
property: spark.eventLog.dir=hdfs://*** Adding default property:
spark.executor.extraJavaOptions=--add-modules java.se.ee

Parsed arguments:

master mesos://localhost:10017
deployMode cluster
executorMemory 16G
executorCores   2
totalExecutorCores 50
propertiesFile /tmp/spark-2.2.2-bin-hadoop2.6/conf/spark- 
defaults.conf 
driverMemory   4G
driverCores 1
driverExtraClassPath   null
driverExtraLibraryPath null
driverExtraJavaOptions null
supervise   false
queue   null
numExecutors   null
files   null
pyFiles null
archives   null
mainClass   com.**.spark.Main
primaryResource ***.jar
name   ***
childArgs   [***]
jars   null
packages   null
packagesExclusions null
repositories   null
verbosetrue

Spark properties used, including those specified through
--conf and those from the properties file
/tmp/spark-2.2.2-bin-hadoop2.6/conf/spark-defaults.conf:
(spark.mesos.uris,hdfs:///***/tmpqIx6x2)
(spark.driver.memory,4G)
(spark.eventLog.enabled,true)
(spark.executor.extraJavaOptions,--add-modules java.se.ee)
(spark.executor.uri,***/spark-2.2.2-bin-hadoop2.6.tgz)
(spark.eventLog.dir,hdfs://***)

*The warning was printed here:*

https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala#L101

https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala#L76

After reading the source code. Seems to me that spark-submit does not
understand --add-modules option so it treat java.se.ee as a jar file rather
than a module. *And I coundn`t make it the way I want it to translate
--add-modules when launching executor JVM.

Has anyone done similar experiments running Spark on Java 9/10?*

Thanks in advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Text from pdf spark

2018-09-28 Thread Joel D
Yes, I can access the file using cli.

On Fri, Sep 28, 2018 at 1:24 PM kathleen li  wrote:

> The error message is “file not found”
> Are you able to use the following command line to assess the file with the
> user you submitted the job?
> hdfs dfs -ls /tmp/sample.pdf
>
> Sent from my iPhone
>
> On Sep 28, 2018, at 12:10 PM, Joel D  wrote:
>
> I'm trying to extract text from pdf files in hdfs using pdfBox.
>
> However it throws an error:
>
> "Exception in thread "main" org.apache.spark.SparkException: ...
>
> java.io.FileNotFoundException: /nnAlias:8020/tmp/sample.pdf
>
> (No such file or directory)"
>
>
>
>
> What am I missing? Should I be working with PortableDataStream instead of
> the string part of:
>
> val files: RDD[(String, PortableDataStream)]?
>
> def pdfRead(fileNameFromRDD: (String, PortableDataStream), sparkSession:
> SparkSession) = {
>
> val file: File = new File(fileNameFromRDD._1.drop(5))
>
> val document = PDDocument.load(file); //It throws an error here.
>
>
> if (!document.isEncrypted()) {
>
>   val stripper = new PDFTextStripper()
>
>   val text = stripper.getText(document)
>
>   println("Text:" + text)
>
>
> }
>
> document.close()
>
>
>   }
>
>
> //This is where I call the above pdf to text converter method.
>
>  val files =
> sparkSession.sparkContext.binaryFiles("hdfs://nnAlias:8020/tmp/sample.pdf")
>
> files.foreach(println)
>
>
> files.foreach(f => println(f._1))
>
>
> files.foreach(fileStream => pdfRead(fileStream, sparkSession))
>
>
> Thanks.
>
>
>
>
>
>
>
>


Re: Text from pdf spark

2018-09-28 Thread kathleen li
The error message is “file not found”
Are you able to use the following command line to assess the file with the user 
you submitted the job?
hdfs dfs -ls /tmp/sample.pdf

Sent from my iPhone

> On Sep 28, 2018, at 12:10 PM, Joel D  wrote:
> 
> I'm trying to extract text from pdf files in hdfs using pdfBox. 
> However it throws an error:
> 
> "Exception in thread "main" org.apache.spark.SparkException: ...
> java.io.FileNotFoundException: /nnAlias:8020/tmp/sample.pdf 
> (No such file or directory)"
> 
> 
> 
> What am I missing? Should I be working with PortableDataStream instead of the 
> string part of:
> val files: RDD[(String, PortableDataStream)]?
> def pdfRead(fileNameFromRDD: (String, PortableDataStream), sparkSession: 
> SparkSession) = {
> val file: File = new File(fileNameFromRDD._1.drop(5))
> val document = PDDocument.load(file); //It throws an error here.
> 
> if (!document.isEncrypted()) {
>   val stripper = new PDFTextStripper()
>   val text = stripper.getText(document)
>   println("Text:" + text)
> 
> }
> document.close()
> 
>   }
> 
> //This is where I call the above pdf to text converter method.
>  val files = 
> sparkSession.sparkContext.binaryFiles("hdfs://nnAlias:8020/tmp/sample.pdf")
> files.foreach(println)
> 
> files.foreach(f => println(f._1))
> 
> files.foreach(fileStream => pdfRead(fileStream, sparkSession))
> 
> Thanks.
> 
> 
> 
> 
> 
> 


Text from pdf spark

2018-09-28 Thread Joel D
I'm trying to extract text from pdf files in hdfs using pdfBox.

However it throws an error:

"Exception in thread "main" org.apache.spark.SparkException: ...

java.io.FileNotFoundException: /nnAlias:8020/tmp/sample.pdf

(No such file or directory)"




What am I missing? Should I be working with PortableDataStream instead of
the string part of:

val files: RDD[(String, PortableDataStream)]?

def pdfRead(fileNameFromRDD: (String, PortableDataStream), sparkSession:
SparkSession) = {

val file: File = new File(fileNameFromRDD._1.drop(5))

val document = PDDocument.load(file); //It throws an error here.


if (!document.isEncrypted()) {

  val stripper = new PDFTextStripper()

  val text = stripper.getText(document)

  println("Text:" + text)


}

document.close()


  }


//This is where I call the above pdf to text converter method.

 val files =
sparkSession.sparkContext.binaryFiles("hdfs://nnAlias:8020/tmp/sample.pdf")

files.foreach(println)


files.foreach(f => println(f._1))


files.foreach(fileStream => pdfRead(fileStream, sparkSession))


Thanks.


Re: Need to convert Dataset to HashMap

2018-09-28 Thread rishmanisation
Thanks for the help so far. I tried caching but the operation seems to be
taking forever. Any tips on how I can speed up this operation?

Also I am not sure case class would work, since different files have
different structures (I am parsing a 1GB file right now but there are a few
different files that I also need to run this on).



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark SQL] why spark sql hash() are returns the same hash value though the keys/expr are not same

2018-09-28 Thread Thakrar, Jayesh
Not sure I get what you mean….

I ran the query that you had – and don’t get the same hash as you.


From: Gokula Krishnan D 
Date: Friday, September 28, 2018 at 10:40 AM
To: "Thakrar, Jayesh" 
Cc: user 
Subject: Re: [Spark SQL] why spark sql hash() are returns the same hash value 
though the keys/expr are not same

Hello Jayesh,

I have masked the input values with .


Thanks & Regards,
Gokula Krishnan (Gokul)


On Wed, Sep 26, 2018 at 2:20 PM Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Cannot reproduce your situation.
Can you share Spark version?

Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_92)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("select hash('40514X'),hash('41751')").show()
++---+
|hash(40514X)|hash(41751)|
++---+
| -1898845883|  916273350|
++---+


scala> spark.sql("select hash('14589'),hash('40004')").show()
+---+---+
|hash(14589)|hash(40004)|
+---+---+
|  777096871|-1593820563|
+---+---+


scala>

From: Gokula Krishnan D mailto:email2...@gmail.com>>
Date: Tuesday, September 25, 2018 at 8:57 PM
To: user mailto:user@spark.apache.org>>
Subject: [Spark SQL] why spark sql hash() are returns the same hash value 
though the keys/expr are not same

Hello All,

I am calculating the hash value  of few columns and determining whether its an 
Insert/Delete/Update Record but found a scenario which is little weird since 
some of the records returns same hash value though the key's are totally 
different.

For the instance,


scala> spark.sql("select hash('40514X'),hash('41751')").show()

+---+---+

|hash(40514)|hash(41751)|

+---+---+

|  976573657|  976573657|

+---+---+


scala> spark.sql("select hash('14589'),hash('40004')").show()

+---+---+

|hash(14589)|hash(40004)|

+---+---+

|  777096871|  777096871|

+---+---+
I do understand that hash() returns an integer, are these reached the max 
value?.

Thanks & Regards,
Gokula Krishnan (Gokul)


Re: [Spark SQL] why spark sql hash() are returns the same hash value though the keys/expr are not same

2018-09-28 Thread Gokula Krishnan D
Hello Jayesh,

I have masked the input values with .


Thanks & Regards,
Gokula Krishnan* (Gokul)*


On Wed, Sep 26, 2018 at 2:20 PM Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Cannot reproduce your situation.
>
> Can you share Spark version?
>
>
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 2.2.0
>
>   /_/
>
>
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_92)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
>
>
> scala> spark.sql("select hash('40514X'),hash('41751')").show()
>
> ++---+
>
> |hash(40514X)|hash(41751)|
>
> ++---+
>
> | -1898845883|  916273350|
>
> ++---+
>
>
>
>
>
> scala> spark.sql("select hash('14589'),hash('40004')").show()
>
> +---+---+
>
> |hash(14589)|hash(40004)|
>
> +---+---+
>
> |  777096871|-1593820563|
>
> +---+---+
>
>
>
>
>
> scala>
>
>
>
> *From: *Gokula Krishnan D 
> *Date: *Tuesday, September 25, 2018 at 8:57 PM
> *To: *user 
> *Subject: *[Spark SQL] why spark sql hash() are returns the same hash
> value though the keys/expr are not same
>
>
>
> Hello All,
>
>
>
> I am calculating the hash value  of few columns and determining whether
> its an Insert/Delete/Update Record but found a scenario which is little
> weird since some of the records returns same hash value though the key's
> are totally different.
>
>
>
> For the instance,
>
>
>
> scala> spark.sql("select hash('40514X'),hash('41751')").show()
>
> +---+---+
>
> |hash(40514)|hash(41751)|
>
> +---+---+
>
> |  976573657|  976573657|
>
> +---+---+
>
>
>
> scala> spark.sql("select hash('14589'),hash('40004')").show()
>
> +---+---+
>
> |hash(14589)|hash(40004)|
>
> +---+---+
>
> |  777096871|  777096871|
>
> +---+---+
>
> I do understand that hash() returns an integer, are these reached the max
> value?.
>
>
>
> Thanks & Regards,
>
> Gokula Krishnan* (Gokul)*
>


Spark checkpointing

2018-09-28 Thread katze maus

Hi,

is there any way to read up on using spark checkpointing (programmaticly) in an in depth manner?
I have an application where I perform multiple operations on a DStream. To my understanding, the result of those Operations would create a new DStream,
which can be used for further operations.

Which leads to a chain of Operations which can be described as following:
1) Periodicly read data from Kafka  (checkpoint this DStream)
2) Create a window on the read data
3) Aggregate on respective windows (checkpoint this DStream)
4) Write to Kafka (checkpoint DStream)

In the above chain of operations the Stream fails with following error: "WindowedDStream has been marked for checkpointing but the storage level has not been set to enable persisting".
I found out that a WindowedDStream is not supporting persisting data to avoid unnecessary copies of data.

Basicly my issue is that I am not able to use a window operation in a checkpointed enviroment:
- calling checkpoint on all DStreams fails, cause WindowedDStream does not support persisting
- calling checkpoint on all DStreams except WindowedDStream fails, cause somehow it still is marked for checkpointing.

Hopefully someone has an idea
kind regards
 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to repartition Spark DStream Kafka ConsumerRecord RDD.

2018-09-28 Thread Alchemist
 How to repartition Spark DStream Kafka ConsumerRecord RDD.  I am getting 
uneven size of Kafka topics.. We want to repartition the input RDD based on 
some logic.
 But when I try to apply the repartition I am getting "object not serializable 
(class: org.apache.kafka.clients.consumer.ConsumerRecord" error, I found 
following workaround
 
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
Call rdd.forEachPartition and create the NotSerializable object in there like 
this:rdd.forEachPartition(iter -> {  NotSerializable notSerializable = new 
NotSerializable();
  // ...Now process iter});
APPLIED HERE
 val stream =KafkaUtils.createDirectStream[String, String]( ssc, 
PreferConsistent, Subscribe[String, String](topics, kafkaParam) 
).map(_.value())      stream.foreachRDD { rdd =>        val repartitionRDD = 
flow.repartitionRDD(rdd,1)        println("&& repartitionRDD " + 
repartitionRDD.count())       val modifiedRDD = rdd.mapPartitions {           
iter =>{            val customerRecords: List[ConsumerRecord[String, String]] = 
List[ConsumerRecord[String, String]]()             while(iter.hasNext){         
         val consumerRecord :ConsumerRecord[String, String] = iter.next()       
           customerRecords:+ consumerRecord             }             
customerRecords.iterator          }        }        val r = 
modifiedRDD.repartition(1)        println("* after repartition " + 
r.count())
BUT still getting same object not Serializable error.   Any help is greatly 
appreciated.        

Re: Need to convert Dataset to HashMap

2018-09-28 Thread Alessandro Solimando
Hi,
sorry indeed you have to cache the dataset, before the groupby (otherwise
it will be loaded at each time from disk).

For the case class you can have a look at the accepted answer here:
https://stackoverflow.com/questions/45017556/how-to-convert-a-simple-dataframe-to-a-dataset-spark-scala-with-case-class


Best regards,
Alessandro

On Fri, 28 Sep 2018 at 09:29, rishmanisation 
wrote:

> Thanks for the response! I'm not sure caching 'freq' would make sense,
> since
> there are multiple columns in the file and so it will need to be different
> for different columns.
>
> Original data format is .gz (gzip).
>
> I am a newbie to Spark, so could you please give a little more details on
> the appropriate case class?
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Need to convert Dataset to HashMap

2018-09-28 Thread rishmanisation
Thanks for the response! I'm not sure caching 'freq' would make sense, since
there are multiple columns in the file and so it will need to be different
for different columns.

Original data format is .gz (gzip).

I am a newbie to Spark, so could you please give a little more details on
the appropriate case class?

Thanks!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Need to convert Dataset to HashMap

2018-09-28 Thread Alessandro Solimando
Hi,
as a first attempt I would try to cache "freq", to be sure that the dataset
is not re-loaded at each iteration later on.

Btw, what's the original data format you are importing from?

I suspect also that an appropriate case class rather than Row would help as
well, instead of converting to String and parsing it "manually".

Hth,
Alessandro

On Fri, 28 Sep 2018 at 01:48, rishmanisation 
wrote:

> I am writing a data-profiling application that needs to iterate over a
> large
> .gz file (imported as a Dataset). Each key-value pair in the hashmap
> will be the row value and the number of times it occurs in the column.
> There
> is one hashmap for each column, and they are all added to a JSON at the
> end.
>
> For now, I am using the following logic to generate the hashmap for a
> column:
>
> Dataset freq = df
> .groupBy(columnName)
> .count();
>
> HashMap myHashMap = new HashMap<>();
>
> Iterator rowIterator = freq.toLocalIterator();
> while(rowIterator.hasNext()) {
> Row currRow = rowIterator.next();
> String rowString = currRow.toString();
> String[] contents = rowString.substring(1, rowString.length() -
> 1).split(",");
> Double percent = Long.valueOf(contents[1])*100.0/numOfRows;
> myHashMap.put(contents[0], Double.toString(percent));
> }
>
> I have also tried converting to RDD and using the collectAsMap() function,
> but both of these are taking a very long time (about 5 minutes per column,
> where each column has approx. 30 million rows). Is there a more efficient
> way to achieve the same?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>