Re: java.lang.OutOfMemoryError: unable to create new native thread

2016-10-31 Thread kant kodali
Hi Vadim,

Thank you so much this was a very useful command. This conversation is
going on here

https://www.mail-archive.com/user@spark.apache.org/msg58656.html

or you can just google "

why spark driver program is creating so many threads? How can I limit this
number?

"

please take a look if you are interested.

Thanks a lot!

On Mon, Oct 31, 2016 at 8:14 AM, Vadim Semenov 
wrote:

> Have you tried to get number of threads in a running process using `cat
> /proc//status` ?
>
> On Sun, Oct 30, 2016 at 11:04 PM, kant kodali  wrote:
>
>> yes I did run ps -ef | grep "app_name" and it is root.
>>
>>
>>
>> On Sun, Oct 30, 2016 at 8:00 PM, Chan Chor Pang 
>> wrote:
>>
>>> sorry, the UID
>>>
>>> On 10/31/16 11:59 AM, Chan Chor Pang wrote:
>>>
>>> actually if the max user processes is not the problem, i have no idea
>>>
>>> but i still suspecting the user,
>>> as the user who run spark-submit is not necessary the pid for the JVM
>>> process
>>>
>>> can u make sure when you "ps -ef | grep {your app id} " the PID is root?
>>> On 10/31/16 11:21 AM, kant kodali wrote:
>>>
>>> The java process is run by the root and it has the same config
>>>
>>> sudo -i
>>>
>>> ulimit -a
>>>
>>> core file size  (blocks, -c) 0
>>> data seg size   (kbytes, -d) unlimited
>>> scheduling priority (-e) 0
>>> file size   (blocks, -f) unlimited
>>> pending signals (-i) 120242
>>> max locked memory   (kbytes, -l) 64
>>> max memory size (kbytes, -m) unlimited
>>> open files  (-n) 1024
>>> pipe size(512 bytes, -p) 8
>>> POSIX message queues (bytes, -q) 819200
>>> real-time priority  (-r) 0
>>> stack size  (kbytes, -s) 8192
>>> cpu time   (seconds, -t) unlimited
>>> max user processes  (-u) 120242
>>> virtual memory  (kbytes, -v) unlimited
>>> file locks  (-x) unlimited
>>>
>>>
>>>
>>> On Sun, Oct 30, 2016 at 7:01 PM, Chan Chor Pang 
>>> wrote:
>>>
 I have the same Exception before and the problem fix after i change the
 nproc conf.

 > max user processes  (-u) 120242
 ↑this config does looks good.
 are u sure the user who run ulimit -a is the same user who run the Java
 process?
 depend on how u submit the job and your setting, spark job may execute
 by other user.


 On 10/31/16 10:38 AM, kant kodali wrote:

 when I did this

 cat /proc/sys/kernel/pid_max

 I got 32768

 On Sun, Oct 30, 2016 at 6:36 PM, kant kodali 
 wrote:

> I believe for ubuntu it is unlimited but I am not 100% sure (I just
> read somewhere online). I ran ulimit -a and this is what I get
>
> core file size  (blocks, -c) 0
> data seg size   (kbytes, -d) unlimited
> scheduling priority (-e) 0
> file size   (blocks, -f) unlimited
> pending signals (-i) 120242
> max locked memory   (kbytes, -l) 64
> max memory size (kbytes, -m) unlimited
> open files  (-n) 1024
> pipe size(512 bytes, -p) 8
> POSIX message queues (bytes, -q) 819200
> real-time priority  (-r) 0
> stack size  (kbytes, -s) 8192
> cpu time   (seconds, -t) unlimited
> max user processes  (-u) 120242
> virtual memory  (kbytes, -v) unlimited
> file locks  (-x) unlimited
>
> On Sun, Oct 30, 2016 at 6:15 PM, Chan Chor Pang <
> chin...@indetail.co.jp> wrote:
>
>> not sure for ubuntu, but i think you can just create the file by
>> yourself
>> the syntax will be the same as /etc/security/limits.conf
>>
>> nproc.conf not only limit java process but all process by the same
>> user
>> so even the jvm process does nothing,  if the corresponding user is
>> busy in other way
>> the jvm process will still not able to create new thread.
>>
>> btw the default limit for centos is 1024
>>
>>
>> On 10/31/16 9:51 AM, kant kodali wrote:
>>
>>
>> On Sun, Oct 30, 2016 at 5:22 PM, Chan Chor Pang <
>> chin...@indetail.co.jp> wrote:
>>
>>> /etc/security/limits.d/90-nproc.conf
>>>
>>
>> Hi,
>>
>> I am using Ubuntu 16.04 LTS. I have this directory
>> /etc/security/limits.d/ but I don't have any files underneath it. This
>> error happens after running for 4 to 5 hours. I wonder if this is a GC
>> issue? And I am thinking if I should use CMS. I have also posted this on 
>> SO
>> since I 

RE: Out Of Memory issue

2016-10-31 Thread Kürşat Kurt
Any idea about this?

 

From: Kürşat Kurt [mailto:kur...@kursatkurt.com] 
Sent: Sunday, October 30, 2016 7:59 AM
To: 'Jörn Franke' 
Cc: 'user@spark.apache.org' 
Subject: RE: Out Of Memory issue

 

Hi Jörn;

 

I am reading 300.000 line csv file. It is “ß” seperated(attached sample file). 
First column is class name and second column is product name.

Java version is 1.8.108, single node. Furthermore (as you can see in code) i 
tried random forests and this get OMM too.

 

 

 

Code :

package main.scala
 
import java.util.Locale
import org.apache.spark.SparkConf
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.feature.IndexToString
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.sql.SparkSession
import com.hrzafer.reshaturkishstemmer.Resha
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.feature.IDF
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.NGram
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.LogisticRegression
import scala.collection.mutable.ListBuffer
import org.apache.spark.ml.classification.OneVsRest
import org.apache.spark.storage.StorageLevel
 
 
object Test1 {
 
  var num = 50;
  var savePath = "hdfs://localhost:54310/SparkWork/SparkModel/";
  var stemmer = Resha.Instance
 
  var STOP_WORDS: Set[String] = Set();
 
  def cropSentence(s: String) = {
s.replaceAll("\\([^\\)]*\\)", "")
  .replaceAll("(\\d+)(gb|GB)  ", "$1 $2")
  .replaceAll(" - ", " ")
  .replaceAll("-", " ")
  .replaceAll(" tr. ", " ")
  .replaceAll("  +", " ")
  .replaceAll(",", " ").trim();
  }
 
  def main(args: Array[String]): Unit = {
 
val start1 = System.currentTimeMillis();
 
val sc = new SparkConf().setAppName("Test")
.set("spark.hadoop.validateOutputSpecs", "false")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
 
 
val spark = SparkSession.builder.appName("Java 
Spark").config(sc).getOrCreate();
import spark.implicits._
 
val mainDataset = 
spark.sparkContext.textFile("hdfs://localhost:54310/SparkWork/classifications.csv")
  .map( _.split("ß"))
  .map(tokens => {  
 var list=new ListBuffer[String]();
  var 
token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr"))); 
 
  token0.split("\\s+  ").map {list+=stemmer.stem(_)}   
  (tokens(1), list.toList.mkString(" "))
  }).persist(StorageLevel.MEMORY_AND_DISK).toDF("className","productName");
 
 
val classIndexer = new StringIndexer()
  .setInputCol("className")
  .setOutputCol("label");
 
val classIndexerModel = classIndexer.fit(mainDataset);
var mainDS=classIndexerModel.transform(mainDataset);
classIndexerModel.write.overwrite.save(savePath + "ClassIndexer");
 //Tokenizer
  val tokenizer = new Tokenizer()
   .setInputCol("productName") 
   .setOutputCol("words_nonfiltered")
   ;
 
//StopWords
  val remover = new StopWordsRemover()
 .setInputCol("words_nonfiltered")
 .setOutputCol("words")
 .setStopWords( 
Array[String]("garanti","garantili","resmi","distribütör","cep","tel","-","//"));
 
//CountVectorize
 
  val countVectorizer = new CountVectorizer()
 .setInputCol("words")
 .setOutputCol("features");
 
 
  val  rfc = new RandomForestClassifier ()  
  .setLabelCol("label")
  .setNumTrees(3)
  .setMaxDepth(3)  
  .setFeatureSubsetStrategy("auto")
  .setFeaturesCol("features")
  .setImpurity("gini")  
  .setMaxBins(3);
 
 
  val nb = new NaiveBayes()
   .setSmoothing(0.1)
   .setModelType("multinomial")
 
 
   val pipeline = new 
Pipeline().setStages(Array(tokenizer,remover,countVectorizer,nb));
 
 
   val splits = mainDS.randomSplit(Array(0.80,0.20));
   val train =splits(0);
   //train.show(num,false);
   val test = splits(1);
   //test.show(num,false);
 
   //mainDataset.show(100,false);

Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-10-31 Thread Mich Talebzadeh
well I suppose one  can drop tempTable as below

scala> df.registerTempTable("tmp")

scala> spark.sql("select count(1) from tmp").show
++
|count(1)|
++
|  904180|
++

scala> spark.sql("drop table if exists tmp")
res22: org.apache.spark.sql.DataFrame = []

Also your point

"But the thing is that I don't explicitly cache the tempTables ..".

I believe tempTable is created in-memory and is already cached

HTH


























Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 October 2016 at 14:16, Michael David Pedersen <
michael.d.peder...@googlemail.com> wrote:

> Hi Mich,
>
> Thank you again for your reply.
>
> As I see you are caching the table already sorted
>>
>> val keyValRDDSorted = keyValRDD.sortByKey().cache
>>
>> and the next stage is you are creating multiple tempTables (different
>> ranges) that cache a subset of rows already cached in RDD. The data stored
>> in tempTable is in Hive columnar format (I assume that means ORC format)
>>
>
> But the thing is that I don't explicitly cache the tempTables, and I don't
> really want to because I'll only run a single query on each tempTable. So I
> expect the SQL query processor to operate directly on the underlying
> key-value RDD, and my concern is that this may be inefficient.
>
>
>> Well that is all you can do.
>>
>
> Ok, thanks - that's really what I wanted to get confirmation of.
>
>
>> Bear in mind that these tempTables are immutable and I do not know any
>> way of dropping tempTable to free more memory.
>>
>
> I'm assuming there won't be any (significant) memory overhead of
> registering the temp tables as long as I don't explicitly cache them. Am I
> wrong? In any case I'll be calling sqlContext.dropTempTable once the query
> has completed, which according to the documentation should also free up
> memory.
>
> Cheers,
> Michael
>


Re: Running Hive and Spark together with Dynamic Resource Allocation

2016-10-31 Thread rachmaninovquartet
It seems like the best solution is to set: yarn.nodemanager.aux-services to
mapred_shuffle,spark_shuffle



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Hive-and-Spark-together-with-Dynamic-Resource-Allocation-tp27968p27978.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Shixiong(Ryan) Zhu
If there is some leaking threads, I think you should be able to see the
number of threads is increasing. You can just dump threads after 1-2 hours.

On Mon, Oct 31, 2016 at 12:59 PM, kant kodali  wrote:

> yes I can certainly use jstack but it requires 4 to 5 hours for me to
> reproduce the error so I can get back as early as possible.
>
> Thanks a lot!
>
> On Mon, Oct 31, 2016 at 12:41 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Then it should not be a Receiver issue. Could you use `jstack` to find
>> out the name of leaking threads?
>>
>> On Mon, Oct 31, 2016 at 12:35 PM, kant kodali  wrote:
>>
>>> Hi Ryan,
>>>
>>> It happens on the driver side and I am running on a client mode (not the
>>> cluster mode).
>>>
>>> Thanks!
>>>
>>> On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 Sorry, there is a typo in my previous email: this may **not** be the
 root cause if the leak threads are in the driver side.

 Does it happen in the driver or executors?

 On Mon, Oct 31, 2016 at 12:20 PM, kant kodali 
 wrote:

> Hi Ryan,
>
> Ahh My Receiver.onStop method is currently empty.
>
> 1) I have a hard time seeing why the receiver would crash so many times 
> within a span of 4 to 5 hours but anyways I understand I should still 
> cleanup during OnStop.
>
> 2) How do I clean up those threads? The documentation here 
> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't 
> seem to have any method where I can clean up the threads created during 
> OnStart. any ideas?
>
> Thanks!
>
>
> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> So in your code, each Receiver will start a new thread. Did you stop
>> the receiver properly in `Receiver.onStop`? Otherwise, you may leak 
>> threads
>> after a receiver crashes and is restarted by Spark. However, this may be
>> the root cause since the leak threads are in the driver side. Could you 
>> use
>> `jstack` to check which types of threads are leaking?
>>
>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali 
>> wrote:
>>
>>> I am also under the assumption that *onStart *function of the
>>> Receiver is only called only once by Spark. please correct me if I
>>> am wrong.
>>>
>>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali 
>>> wrote:
>>>
 My driver program runs a spark streaming job.  And it spawns a
 thread by itself only in the *onStart()* function below Other than
 that it doesn't spawn any other threads. It only calls MapToPair,
 ReduceByKey, forEachRDD, Collect functions.

 public class NSQReceiver extends Receiver {

 private String topic="";

 public NSQReceiver(String topic) {
 super(StorageLevel.MEMORY_AND_DISK_2());
 this.topic = topic;
 }

 @Override
 public void *onStart()* {
 new Thread()  {
 @Override public void run() {
 receive();
 }
 }.start();
 }

 }


 Environment info:

 Java 8

 Scala 2.11.8

 Spark 2.0.0

 More than happy to share any other info you may need.


 On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky 
 wrote:

>  > how do I tell my spark driver program to not create so many?
>
> This may depend on your driver program. Do you spawn any threads in
> it? Could you share some more information on the driver program,
> spark
> version and your environment? It would greatly help others to help
> you
>
> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali 
> wrote:
> > The source of my problem is actually that I am running into the
> following
> > error. This error seems to happen after running my driver
> program for 4
> > hours.
> >
> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in
> thread
> > "dag-scheduler-event-loop" Exception in thread
> "ForkJoinPool-50-worker-13"
> > java.lang.OutOfMemoryError: unable to create new native thread"
> >
> > and this wonderful book taught me that the error "unable to
> create new
> > native thread" can happen because JVM is trying to request the
> OS for a
> > thread and it is refusing to do so for the following reasons
> >
> > 1. The 

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread kant kodali
yes I can certainly use jstack but it requires 4 to 5 hours for me to
reproduce the error so I can get back as early as possible.

Thanks a lot!

On Mon, Oct 31, 2016 at 12:41 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> Then it should not be a Receiver issue. Could you use `jstack` to find out
> the name of leaking threads?
>
> On Mon, Oct 31, 2016 at 12:35 PM, kant kodali  wrote:
>
>> Hi Ryan,
>>
>> It happens on the driver side and I am running on a client mode (not the
>> cluster mode).
>>
>> Thanks!
>>
>> On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Sorry, there is a typo in my previous email: this may **not** be the
>>> root cause if the leak threads are in the driver side.
>>>
>>> Does it happen in the driver or executors?
>>>
>>> On Mon, Oct 31, 2016 at 12:20 PM, kant kodali 
>>> wrote:
>>>
 Hi Ryan,

 Ahh My Receiver.onStop method is currently empty.

 1) I have a hard time seeing why the receiver would crash so many times 
 within a span of 4 to 5 hours but anyways I understand I should still 
 cleanup during OnStop.

 2) How do I clean up those threads? The documentation here 
 https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't 
 seem to have any method where I can clean up the threads created during 
 OnStart. any ideas?

 Thanks!


 On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
 shixi...@databricks.com> wrote:

> So in your code, each Receiver will start a new thread. Did you stop
> the receiver properly in `Receiver.onStop`? Otherwise, you may leak 
> threads
> after a receiver crashes and is restarted by Spark. However, this may be
> the root cause since the leak threads are in the driver side. Could you 
> use
> `jstack` to check which types of threads are leaking?
>
> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali 
> wrote:
>
>> I am also under the assumption that *onStart *function of the
>> Receiver is only called only once by Spark. please correct me if I
>> am wrong.
>>
>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali 
>> wrote:
>>
>>> My driver program runs a spark streaming job.  And it spawns a
>>> thread by itself only in the *onStart()* function below Other than
>>> that it doesn't spawn any other threads. It only calls MapToPair,
>>> ReduceByKey, forEachRDD, Collect functions.
>>>
>>> public class NSQReceiver extends Receiver {
>>>
>>> private String topic="";
>>>
>>> public NSQReceiver(String topic) {
>>> super(StorageLevel.MEMORY_AND_DISK_2());
>>> this.topic = topic;
>>> }
>>>
>>> @Override
>>> public void *onStart()* {
>>> new Thread()  {
>>> @Override public void run() {
>>> receive();
>>> }
>>> }.start();
>>> }
>>>
>>> }
>>>
>>>
>>> Environment info:
>>>
>>> Java 8
>>>
>>> Scala 2.11.8
>>>
>>> Spark 2.0.0
>>>
>>> More than happy to share any other info you may need.
>>>
>>>
>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky 
>>> wrote:
>>>
  > how do I tell my spark driver program to not create so many?

 This may depend on your driver program. Do you spawn any threads in
 it? Could you share some more information on the driver program,
 spark
 version and your environment? It would greatly help others to help
 you

 On Mon, Oct 31, 2016 at 3:47 AM, kant kodali 
 wrote:
 > The source of my problem is actually that I am running into the
 following
 > error. This error seems to happen after running my driver program
 for 4
 > hours.
 >
 > "Exception in thread "ForkJoinPool-50-worker-11" Exception in
 thread
 > "dag-scheduler-event-loop" Exception in thread
 "ForkJoinPool-50-worker-13"
 > java.lang.OutOfMemoryError: unable to create new native thread"
 >
 > and this wonderful book taught me that the error "unable to
 create new
 > native thread" can happen because JVM is trying to request the OS
 for a
 > thread and it is refusing to do so for the following reasons
 >
 > 1. The system has actually run out of virtual memory.
 > 2. On Unix-style systems, the user has already created (between
 all programs
 > user is running) the maximum number of processes configured for
 that user
 > login. Individual threads are considered a process in that regard.
 >
 > Option #2 is ruled out in my 

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Shixiong(Ryan) Zhu
Then it should not be a Receiver issue. Could you use `jstack` to find out
the name of leaking threads?

On Mon, Oct 31, 2016 at 12:35 PM, kant kodali  wrote:

> Hi Ryan,
>
> It happens on the driver side and I am running on a client mode (not the
> cluster mode).
>
> Thanks!
>
> On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Sorry, there is a typo in my previous email: this may **not** be the
>> root cause if the leak threads are in the driver side.
>>
>> Does it happen in the driver or executors?
>>
>> On Mon, Oct 31, 2016 at 12:20 PM, kant kodali  wrote:
>>
>>> Hi Ryan,
>>>
>>> Ahh My Receiver.onStop method is currently empty.
>>>
>>> 1) I have a hard time seeing why the receiver would crash so many times 
>>> within a span of 4 to 5 hours but anyways I understand I should still 
>>> cleanup during OnStop.
>>>
>>> 2) How do I clean up those threads? The documentation here 
>>> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't 
>>> seem to have any method where I can clean up the threads created during 
>>> OnStart. any ideas?
>>>
>>> Thanks!
>>>
>>>
>>> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 So in your code, each Receiver will start a new thread. Did you stop
 the receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
 after a receiver crashes and is restarted by Spark. However, this may be
 the root cause since the leak threads are in the driver side. Could you use
 `jstack` to check which types of threads are leaking?

 On Mon, Oct 31, 2016 at 11:50 AM, kant kodali 
 wrote:

> I am also under the assumption that *onStart *function of the Receiver
>  is only called only once by Spark. please correct me if I am wrong.
>
> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali 
> wrote:
>
>> My driver program runs a spark streaming job.  And it spawns a thread
>> by itself only in the *onStart()* function below Other than that it
>> doesn't spawn any other threads. It only calls MapToPair, ReduceByKey,
>> forEachRDD, Collect functions.
>>
>> public class NSQReceiver extends Receiver {
>>
>> private String topic="";
>>
>> public NSQReceiver(String topic) {
>> super(StorageLevel.MEMORY_AND_DISK_2());
>> this.topic = topic;
>> }
>>
>> @Override
>> public void *onStart()* {
>> new Thread()  {
>> @Override public void run() {
>> receive();
>> }
>> }.start();
>> }
>>
>> }
>>
>>
>> Environment info:
>>
>> Java 8
>>
>> Scala 2.11.8
>>
>> Spark 2.0.0
>>
>> More than happy to share any other info you may need.
>>
>>
>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky 
>> wrote:
>>
>>>  > how do I tell my spark driver program to not create so many?
>>>
>>> This may depend on your driver program. Do you spawn any threads in
>>> it? Could you share some more information on the driver program,
>>> spark
>>> version and your environment? It would greatly help others to help
>>> you
>>>
>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali 
>>> wrote:
>>> > The source of my problem is actually that I am running into the
>>> following
>>> > error. This error seems to happen after running my driver program
>>> for 4
>>> > hours.
>>> >
>>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in
>>> thread
>>> > "dag-scheduler-event-loop" Exception in thread
>>> "ForkJoinPool-50-worker-13"
>>> > java.lang.OutOfMemoryError: unable to create new native thread"
>>> >
>>> > and this wonderful book taught me that the error "unable to create
>>> new
>>> > native thread" can happen because JVM is trying to request the OS
>>> for a
>>> > thread and it is refusing to do so for the following reasons
>>> >
>>> > 1. The system has actually run out of virtual memory.
>>> > 2. On Unix-style systems, the user has already created (between
>>> all programs
>>> > user is running) the maximum number of processes configured for
>>> that user
>>> > login. Individual threads are considered a process in that regard.
>>> >
>>> > Option #2 is ruled out in my case because my driver programing is
>>> running
>>> > with a userid of root which has  maximum number of processes set
>>> to 120242
>>> >
>>> > ulimit -a gives me the following
>>> >
>>> > core file size  (blocks, -c) 0
>>> > data seg size   (kbytes, -d) unlimited
>>> > scheduling priority (-e) 0
>>> > file size 

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread kant kodali
Hi Ryan,

It happens on the driver side and I am running on a client mode (not the
cluster mode).

Thanks!

On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> Sorry, there is a typo in my previous email: this may **not** be the root
> cause if the leak threads are in the driver side.
>
> Does it happen in the driver or executors?
>
> On Mon, Oct 31, 2016 at 12:20 PM, kant kodali  wrote:
>
>> Hi Ryan,
>>
>> Ahh My Receiver.onStop method is currently empty.
>>
>> 1) I have a hard time seeing why the receiver would crash so many times 
>> within a span of 4 to 5 hours but anyways I understand I should still 
>> cleanup during OnStop.
>>
>> 2) How do I clean up those threads? The documentation here 
>> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't seem 
>> to have any method where I can clean up the threads created during OnStart. 
>> any ideas?
>>
>> Thanks!
>>
>>
>> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> So in your code, each Receiver will start a new thread. Did you stop the
>>> receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
>>> after a receiver crashes and is restarted by Spark. However, this may be
>>> the root cause since the leak threads are in the driver side. Could you use
>>> `jstack` to check which types of threads are leaking?
>>>
>>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali 
>>> wrote:
>>>
 I am also under the assumption that *onStart *function of the Receiver is
 only called only once by Spark. please correct me if I am wrong.

 On Mon, Oct 31, 2016 at 11:35 AM, kant kodali 
 wrote:

> My driver program runs a spark streaming job.  And it spawns a thread
> by itself only in the *onStart()* function below Other than that it
> doesn't spawn any other threads. It only calls MapToPair, ReduceByKey,
> forEachRDD, Collect functions.
>
> public class NSQReceiver extends Receiver {
>
> private String topic="";
>
> public NSQReceiver(String topic) {
> super(StorageLevel.MEMORY_AND_DISK_2());
> this.topic = topic;
> }
>
> @Override
> public void *onStart()* {
> new Thread()  {
> @Override public void run() {
> receive();
> }
> }.start();
> }
>
> }
>
>
> Environment info:
>
> Java 8
>
> Scala 2.11.8
>
> Spark 2.0.0
>
> More than happy to share any other info you may need.
>
>
> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky 
> wrote:
>
>>  > how do I tell my spark driver program to not create so many?
>>
>> This may depend on your driver program. Do you spawn any threads in
>> it? Could you share some more information on the driver program, spark
>> version and your environment? It would greatly help others to help you
>>
>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali 
>> wrote:
>> > The source of my problem is actually that I am running into the
>> following
>> > error. This error seems to happen after running my driver program
>> for 4
>> > hours.
>> >
>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
>> > "dag-scheduler-event-loop" Exception in thread
>> "ForkJoinPool-50-worker-13"
>> > java.lang.OutOfMemoryError: unable to create new native thread"
>> >
>> > and this wonderful book taught me that the error "unable to create
>> new
>> > native thread" can happen because JVM is trying to request the OS
>> for a
>> > thread and it is refusing to do so for the following reasons
>> >
>> > 1. The system has actually run out of virtual memory.
>> > 2. On Unix-style systems, the user has already created (between all
>> programs
>> > user is running) the maximum number of processes configured for
>> that user
>> > login. Individual threads are considered a process in that regard.
>> >
>> > Option #2 is ruled out in my case because my driver programing is
>> running
>> > with a userid of root which has  maximum number of processes set to
>> 120242
>> >
>> > ulimit -a gives me the following
>> >
>> > core file size  (blocks, -c) 0
>> > data seg size   (kbytes, -d) unlimited
>> > scheduling priority (-e) 0
>> > file size   (blocks, -f) unlimited
>> > pending signals (-i) 120242
>> > max locked memory   (kbytes, -l) 64
>> > max memory size (kbytes, -m) unlimited
>> > open files  (-n) 1024
>> > pipe size(512 bytes, -p) 8
>> > POSIX message queues (bytes, -q) 819200
>> > 

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Shixiong(Ryan) Zhu
Sorry, there is a typo in my previous email: this may **not** be the root
cause if the leak threads are in the driver side.

Does it happen in the driver or executors?

On Mon, Oct 31, 2016 at 12:20 PM, kant kodali  wrote:

> Hi Ryan,
>
> Ahh My Receiver.onStop method is currently empty.
>
> 1) I have a hard time seeing why the receiver would crash so many times 
> within a span of 4 to 5 hours but anyways I understand I should still cleanup 
> during OnStop.
>
> 2) How do I clean up those threads? The documentation here 
> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't seem 
> to have any method where I can clean up the threads created during OnStart. 
> any ideas?
>
> Thanks!
>
>
> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> So in your code, each Receiver will start a new thread. Did you stop the
>> receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
>> after a receiver crashes and is restarted by Spark. However, this may be
>> the root cause since the leak threads are in the driver side. Could you use
>> `jstack` to check which types of threads are leaking?
>>
>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali  wrote:
>>
>>> I am also under the assumption that *onStart *function of the Receiver is
>>> only called only once by Spark. please correct me if I am wrong.
>>>
>>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali 
>>> wrote:
>>>
 My driver program runs a spark streaming job.  And it spawns a thread
 by itself only in the *onStart()* function below Other than that it
 doesn't spawn any other threads. It only calls MapToPair, ReduceByKey,
 forEachRDD, Collect functions.

 public class NSQReceiver extends Receiver {

 private String topic="";

 public NSQReceiver(String topic) {
 super(StorageLevel.MEMORY_AND_DISK_2());
 this.topic = topic;
 }

 @Override
 public void *onStart()* {
 new Thread()  {
 @Override public void run() {
 receive();
 }
 }.start();
 }

 }


 Environment info:

 Java 8

 Scala 2.11.8

 Spark 2.0.0

 More than happy to share any other info you may need.


 On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky 
 wrote:

>  > how do I tell my spark driver program to not create so many?
>
> This may depend on your driver program. Do you spawn any threads in
> it? Could you share some more information on the driver program, spark
> version and your environment? It would greatly help others to help you
>
> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali 
> wrote:
> > The source of my problem is actually that I am running into the
> following
> > error. This error seems to happen after running my driver program
> for 4
> > hours.
> >
> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
> > "dag-scheduler-event-loop" Exception in thread
> "ForkJoinPool-50-worker-13"
> > java.lang.OutOfMemoryError: unable to create new native thread"
> >
> > and this wonderful book taught me that the error "unable to create
> new
> > native thread" can happen because JVM is trying to request the OS
> for a
> > thread and it is refusing to do so for the following reasons
> >
> > 1. The system has actually run out of virtual memory.
> > 2. On Unix-style systems, the user has already created (between all
> programs
> > user is running) the maximum number of processes configured for that
> user
> > login. Individual threads are considered a process in that regard.
> >
> > Option #2 is ruled out in my case because my driver programing is
> running
> > with a userid of root which has  maximum number of processes set to
> 120242
> >
> > ulimit -a gives me the following
> >
> > core file size  (blocks, -c) 0
> > data seg size   (kbytes, -d) unlimited
> > scheduling priority (-e) 0
> > file size   (blocks, -f) unlimited
> > pending signals (-i) 120242
> > max locked memory   (kbytes, -l) 64
> > max memory size (kbytes, -m) unlimited
> > open files  (-n) 1024
> > pipe size(512 bytes, -p) 8
> > POSIX message queues (bytes, -q) 819200
> > real-time priority  (-r) 0
> > stack size  (kbytes, -s) 8192
> > cpu time   (seconds, -t) unlimited
> > max user processes  (-u) 120242
> > virtual memory  (kbytes, -v) unlimited
> > file locks  (-x) unlimited
> >
> > So at this 

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Sean Owen
This is more of a Java question. You don't 'clean up' threads but rather
rearchitect your app so that you don't create long running threads that
don't terminate. Consider also an Executor instead of manually creating
threads.

On Mon, Oct 31, 2016 at 7:20 PM kant kodali  wrote:

> Hi Ryan,
>
> Ahh My Receiver.onStop method is currently empty.
>
> 1) I have a hard time seeing why the receiver would crash so many times 
> within a span of 4 to 5 hours but anyways I understand I should still cleanup 
> during OnStop.
>
> 2) How do I clean up those threads? The documentation here 
> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't seem 
> to have any method where I can clean up the threads created during OnStart. 
> any ideas?
>
> Thanks!
>
>
> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
> So in your code, each Receiver will start a new thread. Did you stop the
> receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
> after a receiver crashes and is restarted by Spark. However, this may be
> the root cause since the leak threads are in the driver side. Could you use
> `jstack` to check which types of threads are leaking?
>
> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali  wrote:
>
> I am also under the assumption that *onStart *function of the Receiver is
> only called only once by Spark. please correct me if I am wrong.
>
> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali  wrote:
>
> My driver program runs a spark streaming job.  And it spawns a thread by
> itself only in the *onStart()* function below Other than that it doesn't
> spawn any other threads. It only calls MapToPair, ReduceByKey, forEachRDD,
> Collect functions.
>
> public class NSQReceiver extends Receiver {
>
> private String topic="";
>
> public NSQReceiver(String topic) {
> super(StorageLevel.MEMORY_AND_DISK_2());
> this.topic = topic;
> }
>
> @Override
> public void *onStart()* {
> new Thread()  {
> @Override public void run() {
> receive();
> }
> }.start();
> }
>
> }
>
>
> Environment info:
>
> Java 8
>
> Scala 2.11.8
>
> Spark 2.0.0
>
> More than happy to share any other info you may need.
>
>
> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky  wrote:
>
>  > how do I tell my spark driver program to not create so many?
>
> This may depend on your driver program. Do you spawn any threads in
> it? Could you share some more information on the driver program, spark
> version and your environment? It would greatly help others to help you
>
> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali  wrote:
> > The source of my problem is actually that I am running into the following
> > error. This error seems to happen after running my driver program for 4
> > hours.
> >
> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
> > "dag-scheduler-event-loop" Exception in thread
> "ForkJoinPool-50-worker-13"
> > java.lang.OutOfMemoryError: unable to create new native thread"
> >
> > and this wonderful book taught me that the error "unable to create new
> > native thread" can happen because JVM is trying to request the OS for a
> > thread and it is refusing to do so for the following reasons
> >
> > 1. The system has actually run out of virtual memory.
> > 2. On Unix-style systems, the user has already created (between all
> programs
> > user is running) the maximum number of processes configured for that user
> > login. Individual threads are considered a process in that regard.
> >
> > Option #2 is ruled out in my case because my driver programing is running
> > with a userid of root which has  maximum number of processes set to
> 120242
> >
> > ulimit -a gives me the following
> >
> > core file size  (blocks, -c) 0
> > data seg size   (kbytes, -d) unlimited
> > scheduling priority (-e) 0
> > file size   (blocks, -f) unlimited
> > pending signals (-i) 120242
> > max locked memory   (kbytes, -l) 64
> > max memory size (kbytes, -m) unlimited
> > open files  (-n) 1024
> > pipe size(512 bytes, -p) 8
> > POSIX message queues (bytes, -q) 819200
> > real-time priority  (-r) 0
> > stack size  (kbytes, -s) 8192
> > cpu time   (seconds, -t) unlimited
> > max user processes  (-u) 120242
> > virtual memory  (kbytes, -v) unlimited
> > file locks  (-x) unlimited
> >
> > So at this point I do understand that the I am running out of memory due
> to
> > allocation of threads so my biggest question is how do I tell my spark
> > driver program to not create so many?
> >
> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen  wrote:
> >>
> >> ps -L [pid] is what shows threads. I am not sure this is 

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread kant kodali
Hi Ryan,

Ahh My Receiver.onStop method is currently empty.

1) I have a hard time seeing why the receiver would crash so many
times within a span of 4 to 5 hours but anyways I understand I should
still cleanup during OnStop.

2) How do I clean up those threads? The documentation here
https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html
doesn't seem to have any method where I can clean up the threads
created during OnStart. any ideas?

Thanks!


On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> So in your code, each Receiver will start a new thread. Did you stop the
> receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
> after a receiver crashes and is restarted by Spark. However, this may be
> the root cause since the leak threads are in the driver side. Could you use
> `jstack` to check which types of threads are leaking?
>
> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali  wrote:
>
>> I am also under the assumption that *onStart *function of the Receiver is
>> only called only once by Spark. please correct me if I am wrong.
>>
>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali  wrote:
>>
>>> My driver program runs a spark streaming job.  And it spawns a thread by
>>> itself only in the *onStart()* function below Other than that it
>>> doesn't spawn any other threads. It only calls MapToPair, ReduceByKey,
>>> forEachRDD, Collect functions.
>>>
>>> public class NSQReceiver extends Receiver {
>>>
>>> private String topic="";
>>>
>>> public NSQReceiver(String topic) {
>>> super(StorageLevel.MEMORY_AND_DISK_2());
>>> this.topic = topic;
>>> }
>>>
>>> @Override
>>> public void *onStart()* {
>>> new Thread()  {
>>> @Override public void run() {
>>> receive();
>>> }
>>> }.start();
>>> }
>>>
>>> }
>>>
>>>
>>> Environment info:
>>>
>>> Java 8
>>>
>>> Scala 2.11.8
>>>
>>> Spark 2.0.0
>>>
>>> More than happy to share any other info you may need.
>>>
>>>
>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky 
>>> wrote:
>>>
  > how do I tell my spark driver program to not create so many?

 This may depend on your driver program. Do you spawn any threads in
 it? Could you share some more information on the driver program, spark
 version and your environment? It would greatly help others to help you

 On Mon, Oct 31, 2016 at 3:47 AM, kant kodali 
 wrote:
 > The source of my problem is actually that I am running into the
 following
 > error. This error seems to happen after running my driver program for
 4
 > hours.
 >
 > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
 > "dag-scheduler-event-loop" Exception in thread
 "ForkJoinPool-50-worker-13"
 > java.lang.OutOfMemoryError: unable to create new native thread"
 >
 > and this wonderful book taught me that the error "unable to create new
 > native thread" can happen because JVM is trying to request the OS for
 a
 > thread and it is refusing to do so for the following reasons
 >
 > 1. The system has actually run out of virtual memory.
 > 2. On Unix-style systems, the user has already created (between all
 programs
 > user is running) the maximum number of processes configured for that
 user
 > login. Individual threads are considered a process in that regard.
 >
 > Option #2 is ruled out in my case because my driver programing is
 running
 > with a userid of root which has  maximum number of processes set to
 120242
 >
 > ulimit -a gives me the following
 >
 > core file size  (blocks, -c) 0
 > data seg size   (kbytes, -d) unlimited
 > scheduling priority (-e) 0
 > file size   (blocks, -f) unlimited
 > pending signals (-i) 120242
 > max locked memory   (kbytes, -l) 64
 > max memory size (kbytes, -m) unlimited
 > open files  (-n) 1024
 > pipe size(512 bytes, -p) 8
 > POSIX message queues (bytes, -q) 819200
 > real-time priority  (-r) 0
 > stack size  (kbytes, -s) 8192
 > cpu time   (seconds, -t) unlimited
 > max user processes  (-u) 120242
 > virtual memory  (kbytes, -v) unlimited
 > file locks  (-x) unlimited
 >
 > So at this point I do understand that the I am running out of memory
 due to
 > allocation of threads so my biggest question is how do I tell my spark
 > driver program to not create so many?
 >
 > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen 
 wrote:
 >>
 >> ps -L [pid] is what shows threads. I am not sure this is counting
 what you
 >> think it 

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Shixiong(Ryan) Zhu
So in your code, each Receiver will start a new thread. Did you stop the
receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
after a receiver crashes and is restarted by Spark. However, this may be
the root cause since the leak threads are in the driver side. Could you use
`jstack` to check which types of threads are leaking?

On Mon, Oct 31, 2016 at 11:50 AM, kant kodali  wrote:

> I am also under the assumption that *onStart *function of the Receiver is
> only called only once by Spark. please correct me if I am wrong.
>
> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali  wrote:
>
>> My driver program runs a spark streaming job.  And it spawns a thread by
>> itself only in the *onStart()* function below Other than that it doesn't
>> spawn any other threads. It only calls MapToPair, ReduceByKey, forEachRDD,
>> Collect functions.
>>
>> public class NSQReceiver extends Receiver {
>>
>> private String topic="";
>>
>> public NSQReceiver(String topic) {
>> super(StorageLevel.MEMORY_AND_DISK_2());
>> this.topic = topic;
>> }
>>
>> @Override
>> public void *onStart()* {
>> new Thread()  {
>> @Override public void run() {
>> receive();
>> }
>> }.start();
>> }
>>
>> }
>>
>>
>> Environment info:
>>
>> Java 8
>>
>> Scala 2.11.8
>>
>> Spark 2.0.0
>>
>> More than happy to share any other info you may need.
>>
>>
>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky 
>> wrote:
>>
>>>  > how do I tell my spark driver program to not create so many?
>>>
>>> This may depend on your driver program. Do you spawn any threads in
>>> it? Could you share some more information on the driver program, spark
>>> version and your environment? It would greatly help others to help you
>>>
>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali  wrote:
>>> > The source of my problem is actually that I am running into the
>>> following
>>> > error. This error seems to happen after running my driver program for 4
>>> > hours.
>>> >
>>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
>>> > "dag-scheduler-event-loop" Exception in thread
>>> "ForkJoinPool-50-worker-13"
>>> > java.lang.OutOfMemoryError: unable to create new native thread"
>>> >
>>> > and this wonderful book taught me that the error "unable to create new
>>> > native thread" can happen because JVM is trying to request the OS for a
>>> > thread and it is refusing to do so for the following reasons
>>> >
>>> > 1. The system has actually run out of virtual memory.
>>> > 2. On Unix-style systems, the user has already created (between all
>>> programs
>>> > user is running) the maximum number of processes configured for that
>>> user
>>> > login. Individual threads are considered a process in that regard.
>>> >
>>> > Option #2 is ruled out in my case because my driver programing is
>>> running
>>> > with a userid of root which has  maximum number of processes set to
>>> 120242
>>> >
>>> > ulimit -a gives me the following
>>> >
>>> > core file size  (blocks, -c) 0
>>> > data seg size   (kbytes, -d) unlimited
>>> > scheduling priority (-e) 0
>>> > file size   (blocks, -f) unlimited
>>> > pending signals (-i) 120242
>>> > max locked memory   (kbytes, -l) 64
>>> > max memory size (kbytes, -m) unlimited
>>> > open files  (-n) 1024
>>> > pipe size(512 bytes, -p) 8
>>> > POSIX message queues (bytes, -q) 819200
>>> > real-time priority  (-r) 0
>>> > stack size  (kbytes, -s) 8192
>>> > cpu time   (seconds, -t) unlimited
>>> > max user processes  (-u) 120242
>>> > virtual memory  (kbytes, -v) unlimited
>>> > file locks  (-x) unlimited
>>> >
>>> > So at this point I do understand that the I am running out of memory
>>> due to
>>> > allocation of threads so my biggest question is how do I tell my spark
>>> > driver program to not create so many?
>>> >
>>> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen  wrote:
>>> >>
>>> >> ps -L [pid] is what shows threads. I am not sure this is counting
>>> what you
>>> >> think it does. My shell process has about a hundred threads, and I
>>> can't
>>> >> imagine why one would have thousands unless your app spawned them.
>>> >>
>>> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali 
>>> wrote:
>>> >>>
>>> >>> when I do
>>> >>>
>>> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l
>>> >>>
>>> >>> The result is around 32K. why does it create so many threads how can
>>> I
>>> >>> limit this?
>>> >
>>> >
>>>
>>
>>
>


Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread kant kodali
I am also under the assumption that *onStart *function of the Receiver is
only called only once by Spark. please correct me if I am wrong.

On Mon, Oct 31, 2016 at 11:35 AM, kant kodali  wrote:

> My driver program runs a spark streaming job.  And it spawns a thread by
> itself only in the *onStart()* function below Other than that it doesn't
> spawn any other threads. It only calls MapToPair, ReduceByKey, forEachRDD,
> Collect functions.
>
> public class NSQReceiver extends Receiver {
>
> private String topic="";
>
> public NSQReceiver(String topic) {
> super(StorageLevel.MEMORY_AND_DISK_2());
> this.topic = topic;
> }
>
> @Override
> public void *onStart()* {
> new Thread()  {
> @Override public void run() {
> receive();
> }
> }.start();
> }
>
> }
>
>
> Environment info:
>
> Java 8
>
> Scala 2.11.8
>
> Spark 2.0.0
>
> More than happy to share any other info you may need.
>
>
> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky  wrote:
>
>>  > how do I tell my spark driver program to not create so many?
>>
>> This may depend on your driver program. Do you spawn any threads in
>> it? Could you share some more information on the driver program, spark
>> version and your environment? It would greatly help others to help you
>>
>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali  wrote:
>> > The source of my problem is actually that I am running into the
>> following
>> > error. This error seems to happen after running my driver program for 4
>> > hours.
>> >
>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
>> > "dag-scheduler-event-loop" Exception in thread
>> "ForkJoinPool-50-worker-13"
>> > java.lang.OutOfMemoryError: unable to create new native thread"
>> >
>> > and this wonderful book taught me that the error "unable to create new
>> > native thread" can happen because JVM is trying to request the OS for a
>> > thread and it is refusing to do so for the following reasons
>> >
>> > 1. The system has actually run out of virtual memory.
>> > 2. On Unix-style systems, the user has already created (between all
>> programs
>> > user is running) the maximum number of processes configured for that
>> user
>> > login. Individual threads are considered a process in that regard.
>> >
>> > Option #2 is ruled out in my case because my driver programing is
>> running
>> > with a userid of root which has  maximum number of processes set to
>> 120242
>> >
>> > ulimit -a gives me the following
>> >
>> > core file size  (blocks, -c) 0
>> > data seg size   (kbytes, -d) unlimited
>> > scheduling priority (-e) 0
>> > file size   (blocks, -f) unlimited
>> > pending signals (-i) 120242
>> > max locked memory   (kbytes, -l) 64
>> > max memory size (kbytes, -m) unlimited
>> > open files  (-n) 1024
>> > pipe size(512 bytes, -p) 8
>> > POSIX message queues (bytes, -q) 819200
>> > real-time priority  (-r) 0
>> > stack size  (kbytes, -s) 8192
>> > cpu time   (seconds, -t) unlimited
>> > max user processes  (-u) 120242
>> > virtual memory  (kbytes, -v) unlimited
>> > file locks  (-x) unlimited
>> >
>> > So at this point I do understand that the I am running out of memory
>> due to
>> > allocation of threads so my biggest question is how do I tell my spark
>> > driver program to not create so many?
>> >
>> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen  wrote:
>> >>
>> >> ps -L [pid] is what shows threads. I am not sure this is counting what
>> you
>> >> think it does. My shell process has about a hundred threads, and I
>> can't
>> >> imagine why one would have thousands unless your app spawned them.
>> >>
>> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali 
>> wrote:
>> >>>
>> >>> when I do
>> >>>
>> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l
>> >>>
>> >>> The result is around 32K. why does it create so many threads how can I
>> >>> limit this?
>> >
>> >
>>
>
>


Re: Performance bug in UDAF?

2016-10-31 Thread Spark User
Trying again. Hoping to find some help in figuring out the performance
bottleneck we are observing.

Thanks,
Bharath

On Sun, Oct 30, 2016 at 11:58 AM, Spark User 
wrote:

> Hi All,
>
> I have a UDAF that seems to perform poorly when its input is skewed. I
> have been debugging the UDAF implementation but I don't see any code that
> is causing the performance to degrade. More details on the data and the
> experiments I have run.
>
> DataSet: Assume 3 columns, column1 being the key.
> Column1   Column2  Column3
> a   1 x
> a   2 x
> a   3 x
> a   4 x
> a   5 x
> a   6 z
> 5 million row for a
> 
> a   100   y
> b   9 y
> b   9 y
> b   10   y
> 3 million rows for b
> ...
> more rows
> total rows is 100 million
>
>
> a has 5 million rows.Column2 for a has 1 million unique values.
> b has 3 million rows. Column2 for b has 80 unique values.
>
> Column 3 has just 100s of unique values not in the order of millions, for
> both a and b.
>
> Say totally there are 100 million rows as the input to a UDAF aggregation.
> And the skew in data is for the keys a and b. All other rows can be ignored
> and do not cause any performance issue/ hot partitions.
>
> The code does a dataSet.groupBy("Column1").agg(udaf("Column2",
> "Column3").
>
> I commented out the UDAF implementation for update and merge methods, so
> essentially the UDAF was doing nothing.
>
> With this code (empty updated and merge for UDAF) the performance for a
> mircro-batch is 16 minutes per micro-batch, micro-batch containing 100
> million rows, with 5million rows for a and 1 million unique values for
> Column2 for a.
>
> But when I pass empty values for Column2 with nothing else change,
> effectively reducing the 1 million unique values for Column2 to just 1
> unique value, empty value. The batch processing time goes down to 4 minutes.
>
> So I am trying to understand why is there such a big performance
> difference? What in UDAF causes the processing time to increase in orders
> of magnitude when there is a skew in the data as observed above?
>
> Any insight from spark developers, contributors, or anyone else who has a
> deeper understanding of UDAF would be helpful.
>
> Thanks,
> Bharath
>
>
>


Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread kant kodali
My driver program runs a spark streaming job.  And it spawns a thread by
itself only in the *onStart()* function below Other than that it doesn't
spawn any other threads. It only calls MapToPair, ReduceByKey, forEachRDD,
Collect functions.

public class NSQReceiver extends Receiver {

private String topic="";

public NSQReceiver(String topic) {
super(StorageLevel.MEMORY_AND_DISK_2());
this.topic = topic;
}

@Override
public void *onStart()* {
new Thread()  {
@Override public void run() {
receive();
}
}.start();
}

}


Environment info:

Java 8

Scala 2.11.8

Spark 2.0.0

More than happy to share any other info you may need.


On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky  wrote:

>  > how do I tell my spark driver program to not create so many?
>
> This may depend on your driver program. Do you spawn any threads in
> it? Could you share some more information on the driver program, spark
> version and your environment? It would greatly help others to help you
>
> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali  wrote:
> > The source of my problem is actually that I am running into the following
> > error. This error seems to happen after running my driver program for 4
> > hours.
> >
> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
> > "dag-scheduler-event-loop" Exception in thread
> "ForkJoinPool-50-worker-13"
> > java.lang.OutOfMemoryError: unable to create new native thread"
> >
> > and this wonderful book taught me that the error "unable to create new
> > native thread" can happen because JVM is trying to request the OS for a
> > thread and it is refusing to do so for the following reasons
> >
> > 1. The system has actually run out of virtual memory.
> > 2. On Unix-style systems, the user has already created (between all
> programs
> > user is running) the maximum number of processes configured for that user
> > login. Individual threads are considered a process in that regard.
> >
> > Option #2 is ruled out in my case because my driver programing is running
> > with a userid of root which has  maximum number of processes set to
> 120242
> >
> > ulimit -a gives me the following
> >
> > core file size  (blocks, -c) 0
> > data seg size   (kbytes, -d) unlimited
> > scheduling priority (-e) 0
> > file size   (blocks, -f) unlimited
> > pending signals (-i) 120242
> > max locked memory   (kbytes, -l) 64
> > max memory size (kbytes, -m) unlimited
> > open files  (-n) 1024
> > pipe size(512 bytes, -p) 8
> > POSIX message queues (bytes, -q) 819200
> > real-time priority  (-r) 0
> > stack size  (kbytes, -s) 8192
> > cpu time   (seconds, -t) unlimited
> > max user processes  (-u) 120242
> > virtual memory  (kbytes, -v) unlimited
> > file locks  (-x) unlimited
> >
> > So at this point I do understand that the I am running out of memory due
> to
> > allocation of threads so my biggest question is how do I tell my spark
> > driver program to not create so many?
> >
> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen  wrote:
> >>
> >> ps -L [pid] is what shows threads. I am not sure this is counting what
> you
> >> think it does. My shell process has about a hundred threads, and I can't
> >> imagine why one would have thousands unless your app spawned them.
> >>
> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali 
> wrote:
> >>>
> >>> when I do
> >>>
> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l
> >>>
> >>> The result is around 32K. why does it create so many threads how can I
> >>> limit this?
> >
> >
>


Re: Submit job with driver options in Mesos Cluster mode

2016-10-31 Thread Michael Gummelt
Can you check if this JIRA is relevant?
https://issues.apache.org/jira/browse/SPARK-2608

If not, can you make a new one?

On Thu, Oct 27, 2016 at 10:27 PM, Rodrick Brown 
wrote:

> Try setting the values in $SPARK_HOME/conf/spark-defaults.conf
>
> i.e.
>
> $ egrep 'spark.(driver|executor).extra' /data/orchard/spark-2.0.1/
> conf/spark-defaults.conf
> spark.executor.extraJavaOptions -Duser.timezone=UTC
> -Xloggc:garbage-collector.log
> spark.driver.extraJavaOptions   -Duser.timezone=UTC
> -Xloggc:garbage-collector.log
>
> --
>
> [image: Orchard Platform] 
>
> Rodrick Brown / DevOPs Engineer
> +1 917 445 6839 / rodr...@orchardplatform.com
> 
>
> Orchard Platform
> 101 5th Avenue, 4th Floor, New York, NY 10003
> http://www.orchardplatform.com
>
> Orchard Blog  | Marketplace Lending
> Meetup 
>
> On Oct 6, 2016, at 12:20 PM, vonnagy  wrote:
>
> I am trying to submit a job to spark running in a Mesos cluster. We need to
> pass custom java options to the driver and executor for configuration, but
> the driver task never includes the options. Here is an example submit.
>
> GC_OPTS="-XX:+UseConcMarkSweepGC
> -verbose:gc -XX:+PrintGCTimeStamps -Xloggc:$appdir/gc.out
> -XX:MaxPermSize=512m
> -XX:+CMSClassUnloadingEnabled "
>
> EXEC_PARAMS="-Dloglevel=DEBUG -Dkafka.broker-address=${KAFKA_ADDRESS}
> -Dredis.master=${REDIS_MASTER} -Dredis.port=${REDIS_PORT}
>
> spark-submit \
>  --name client-events-intake \
>  --class ClientEventsApp \
>  --deploy-mode cluster \
>  --driver-java-options "${EXEC_PARAMS} ${GC_OPTS}" \
>  --conf "spark.ui.killEnabled=true" \
>  --conf "spark.mesos.coarse=true" \
>  --conf "spark.driver.extraJavaOptions=${EXEC_PARAMS}" \
>  --conf "spark.executor.extraJavaOptions=${EXEC_PARAMS}" \
>  --master mesos://someip:7077 \
>  --verbose \
>  some.jar
>
> When the driver task runs in Mesos it is creating the following command:
>
> sh -c 'cd spark-1*;  bin/spark-submit --name client-events-intake --class
> ClientEventsApp --master mesos://someip:5050 --driver-cores 1.0
> --driver-memory 512M ../some.jar '
>
> There are no options for the driver here, thus the driver app blows up
> because it can't find the java options. However, the environment variables
> contain the executor options:
>
> SPARK_EXECUTOR_OPTS -> -Dspark.executor.extraJavaOptions=-Dloglevel=DEBUG
> ...
>
> Any help would be great. I know that we can set some "spark.*" settings in
> default configs, but these are not necessarily spark related. This is not
> an
> issue when running the same logic outside of a Mesos cluster in Spark
> standalone mode.
>
> Thanks!
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Submit-job-with-driver-options-in-
> Mesos-Cluster-mode-tp27853.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
> *NOTICE TO RECIPIENTS*: This communication is confidential and intended
> for the use of the addressee only. If you are not an intended recipient of
> this communication, please delete it immediately and notify the sender by
> return email. Unauthorized reading, dissemination, distribution or copying
> of this communication is prohibited. This communication does not constitute
> an offer to sell or a solicitation of an indication of interest to purchase
> any loan, security or any other financial product or instrument, nor is it
> an offer to sell or a solicitation of an indication of interest to purchase
> any products or services to any persons who are prohibited from receiving
> such information under applicable law. The contents of this communication
> may not be accurate or complete and are subject to change without notice.
> As such, Orchard App, Inc. (including its subsidiaries and affiliates,
> "Orchard") makes no representation regarding the accuracy or completeness
> of the information contained herein. The intended recipient is advised to
> consult its own professional advisors, including those specializing in
> legal, tax and accounting matters. Orchard does not provide legal, tax or
> accounting advice.
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Jakob Odersky
 > how do I tell my spark driver program to not create so many?

This may depend on your driver program. Do you spawn any threads in
it? Could you share some more information on the driver program, spark
version and your environment? It would greatly help others to help you

On Mon, Oct 31, 2016 at 3:47 AM, kant kodali  wrote:
> The source of my problem is actually that I am running into the following
> error. This error seems to happen after running my driver program for 4
> hours.
>
> "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
> "dag-scheduler-event-loop" Exception in thread "ForkJoinPool-50-worker-13"
> java.lang.OutOfMemoryError: unable to create new native thread"
>
> and this wonderful book taught me that the error "unable to create new
> native thread" can happen because JVM is trying to request the OS for a
> thread and it is refusing to do so for the following reasons
>
> 1. The system has actually run out of virtual memory.
> 2. On Unix-style systems, the user has already created (between all programs
> user is running) the maximum number of processes configured for that user
> login. Individual threads are considered a process in that regard.
>
> Option #2 is ruled out in my case because my driver programing is running
> with a userid of root which has  maximum number of processes set to 120242
>
> ulimit -a gives me the following
>
> core file size  (blocks, -c) 0
> data seg size   (kbytes, -d) unlimited
> scheduling priority (-e) 0
> file size   (blocks, -f) unlimited
> pending signals (-i) 120242
> max locked memory   (kbytes, -l) 64
> max memory size (kbytes, -m) unlimited
> open files  (-n) 1024
> pipe size(512 bytes, -p) 8
> POSIX message queues (bytes, -q) 819200
> real-time priority  (-r) 0
> stack size  (kbytes, -s) 8192
> cpu time   (seconds, -t) unlimited
> max user processes  (-u) 120242
> virtual memory  (kbytes, -v) unlimited
> file locks  (-x) unlimited
>
> So at this point I do understand that the I am running out of memory due to
> allocation of threads so my biggest question is how do I tell my spark
> driver program to not create so many?
>
> On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen  wrote:
>>
>> ps -L [pid] is what shows threads. I am not sure this is counting what you
>> think it does. My shell process has about a hundred threads, and I can't
>> imagine why one would have thousands unless your app spawned them.
>>
>> On Mon, Oct 31, 2016 at 10:20 AM kant kodali  wrote:
>>>
>>> when I do
>>>
>>> ps -elfT | grep "spark-driver-program.jar" | wc -l
>>>
>>> The result is around 32K. why does it create so many threads how can I
>>> limit this?
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: MapWithState partitioning

2016-10-31 Thread Andrii Biletskyi
Thanks,

As I understand for Kafka case the way to do it is to define my
kafka.Partitioner that is used when data is produced to Kafka and just
reuse this partitioner as spark.Partitioner in mapWithState spec.

I think I'll stick with that.

Thanks,
Andrii

2016-10-31 16:55 GMT+02:00 Cody Koeninger :

> You may know that those streams share the same keys, but Spark doesn't
> unless you tell it.
>
> mapWithState takes a StateSpec, which should allow you to specify a
> partitioner.
>
> On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi 
> wrote:
> > Thanks for response,
> >
> > So as I understand there is no way to "tell" mapWithState leave the
> > partitioning schema as any other transformation would normally do.
> > Then I would like to clarify if there is a simple way to do a
> transformation
> > to a key-value stream and specify somehow the Partitioner that
> effectively
> > would result in the same partitioning schema as the original stream.
> > I.e.:
> >
> > stream.mapPartitions({ crs =>
> >   crs.map { cr =>
> > cr.key() -> cr.value()
> >   }
> > }) <--- specify somehow Partitioner here for the resulting rdd.
> >
> >
> > The reason I ask is that it simply looks strange to me that Spark will
> have
> > to shuffle each time my input stream and "state" stream during the
> > mapWithState operation when I now for sure that those two streams will
> > always share same keys and will not need access to others partitions.
> >
> > Thanks,
> > Andrii
> >
> >
> > 2016-10-31 15:45 GMT+02:00 Cody Koeninger :
> >>
> >> If you call a transformation on an rdd using the same partitioner as
> that
> >> rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner,
> there's no
> >> consistent partitioning scheme that works for all kafka uses. You can
> wrap
> >> each kafkardd with an rdd that has a custom partitioner that you write
> to
> >> match your kafka partitioning scheme, and avoid a shuffle.
> >>
> >> The danger there is if you have any misbehaving producers, or translate
> >> the partitioning wrongly, you'll get bad results. It's safer just to
> >> shuffle.
> >>
> >>
> >> On Oct 31, 2016 04:31, "Andrii Biletskyi"
> >>  wrote:
> >>
> >> Hi all,
> >>
> >> I'm using Spark Streaming mapWithState operation to do a stateful
> >> operation on my Kafka stream (though I think similar arguments would
> apply
> >> for any source).
> >>
> >> Trying to understand a way to control mapWithState's partitioning
> schema.
> >>
> >> My transformations are simple:
> >>
> >> 1) create KafkaDStream
> >> 2) mapPartitions to get a key-value stream where `key` corresponds to
> >> Kafka message key
> >> 3) apply mapWithState operation on key-value stream, the state stream
> >> shares keys with the original stream, the resulting streams doesn't
> change
> >> keys either
> >>
> >> The problem is that, as I understand, mapWithState stream has a
> different
> >> partitioning schema and thus I see shuffles in Spark Web UI.
> >>
> >> From the mapWithState implementation I see that:
> >> mapwithState uses Partitioner if specified, otherwise partitions data
> with
> >> HashPartitioner(). The thing is that original
> >> KafkaDStream has a specific partitioning schema: Kafka partitions
> correspond
> >> Spark RDD partitions.
> >>
> >> Question: is there a way for mapWithState stream to inherit partitioning
> >> schema from the original stream (i.e. correspond to Kafka partitions).
> >>
> >> Thanks,
> >> Andrii
> >>
> >>
> >
>


Re: java.lang.OutOfMemoryError: unable to create new native thread

2016-10-31 Thread Vadim Semenov
Have you tried to get number of threads in a running process using `cat
/proc//status` ?

On Sun, Oct 30, 2016 at 11:04 PM, kant kodali  wrote:

> yes I did run ps -ef | grep "app_name" and it is root.
>
>
>
> On Sun, Oct 30, 2016 at 8:00 PM, Chan Chor Pang 
> wrote:
>
>> sorry, the UID
>>
>> On 10/31/16 11:59 AM, Chan Chor Pang wrote:
>>
>> actually if the max user processes is not the problem, i have no idea
>>
>> but i still suspecting the user,
>> as the user who run spark-submit is not necessary the pid for the JVM
>> process
>>
>> can u make sure when you "ps -ef | grep {your app id} " the PID is root?
>> On 10/31/16 11:21 AM, kant kodali wrote:
>>
>> The java process is run by the root and it has the same config
>>
>> sudo -i
>>
>> ulimit -a
>>
>> core file size  (blocks, -c) 0
>> data seg size   (kbytes, -d) unlimited
>> scheduling priority (-e) 0
>> file size   (blocks, -f) unlimited
>> pending signals (-i) 120242
>> max locked memory   (kbytes, -l) 64
>> max memory size (kbytes, -m) unlimited
>> open files  (-n) 1024
>> pipe size(512 bytes, -p) 8
>> POSIX message queues (bytes, -q) 819200
>> real-time priority  (-r) 0
>> stack size  (kbytes, -s) 8192
>> cpu time   (seconds, -t) unlimited
>> max user processes  (-u) 120242
>> virtual memory  (kbytes, -v) unlimited
>> file locks  (-x) unlimited
>>
>>
>>
>> On Sun, Oct 30, 2016 at 7:01 PM, Chan Chor Pang 
>> wrote:
>>
>>> I have the same Exception before and the problem fix after i change the
>>> nproc conf.
>>>
>>> > max user processes  (-u) 120242
>>> ↑this config does looks good.
>>> are u sure the user who run ulimit -a is the same user who run the Java
>>> process?
>>> depend on how u submit the job and your setting, spark job may execute
>>> by other user.
>>>
>>>
>>> On 10/31/16 10:38 AM, kant kodali wrote:
>>>
>>> when I did this
>>>
>>> cat /proc/sys/kernel/pid_max
>>>
>>> I got 32768
>>>
>>> On Sun, Oct 30, 2016 at 6:36 PM, kant kodali  wrote:
>>>
 I believe for ubuntu it is unlimited but I am not 100% sure (I just
 read somewhere online). I ran ulimit -a and this is what I get

 core file size  (blocks, -c) 0
 data seg size   (kbytes, -d) unlimited
 scheduling priority (-e) 0
 file size   (blocks, -f) unlimited
 pending signals (-i) 120242
 max locked memory   (kbytes, -l) 64
 max memory size (kbytes, -m) unlimited
 open files  (-n) 1024
 pipe size(512 bytes, -p) 8
 POSIX message queues (bytes, -q) 819200
 real-time priority  (-r) 0
 stack size  (kbytes, -s) 8192
 cpu time   (seconds, -t) unlimited
 max user processes  (-u) 120242
 virtual memory  (kbytes, -v) unlimited
 file locks  (-x) unlimited

 On Sun, Oct 30, 2016 at 6:15 PM, Chan Chor Pang  wrote:

> not sure for ubuntu, but i think you can just create the file by
> yourself
> the syntax will be the same as /etc/security/limits.conf
>
> nproc.conf not only limit java process but all process by the same user
> so even the jvm process does nothing,  if the corresponding user is
> busy in other way
> the jvm process will still not able to create new thread.
>
> btw the default limit for centos is 1024
>
>
> On 10/31/16 9:51 AM, kant kodali wrote:
>
>
> On Sun, Oct 30, 2016 at 5:22 PM, Chan Chor Pang <
> chin...@indetail.co.jp> wrote:
>
>> /etc/security/limits.d/90-nproc.conf
>>
>
> Hi,
>
> I am using Ubuntu 16.04 LTS. I have this directory
> /etc/security/limits.d/ but I don't have any files underneath it. This
> error happens after running for 4 to 5 hours. I wonder if this is a GC
> issue? And I am thinking if I should use CMS. I have also posted this on 
> SO
> since I havent got much response for this question
> http://stackoverflow.com/questions/40315589/dag-sch
> eduler-event-loop-java-lang-outofmemoryerror-unable-to-creat
> e-new-native
>
>
> Thanks,
> kant
>
>
> --
> ---**---*---*---*---
> 株式会社INDETAIL
> ニアショア総合サービス事業本部
> ゲームサービス事業部
> 陳 楚鵬
> E-mail :chin...@indetail.co.jp
> URL : http://www.indetail.co.jp
>
> 【札幌本社/LABO/LABO2】
> 〒060-0042
> 札幌市中央区大通西9丁目3番地33
> キタコーセンタービルディング
> (札幌本社/LABO2:2階、LABO:9階)
> TEL:011-206-9235 FAX:011-206-9236
>
> 【東京支店】
> 〒108-0014
> 東京都港区芝5丁目29番20号 クロスオフィス三田
> TEL:03-6809-6502 FAX:03-6809-6504
>
> 

Re: MapWithState partitioning

2016-10-31 Thread Cody Koeninger
You may know that those streams share the same keys, but Spark doesn't
unless you tell it.

mapWithState takes a StateSpec, which should allow you to specify a partitioner.

On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi  wrote:
> Thanks for response,
>
> So as I understand there is no way to "tell" mapWithState leave the
> partitioning schema as any other transformation would normally do.
> Then I would like to clarify if there is a simple way to do a transformation
> to a key-value stream and specify somehow the Partitioner that effectively
> would result in the same partitioning schema as the original stream.
> I.e.:
>
> stream.mapPartitions({ crs =>
>   crs.map { cr =>
> cr.key() -> cr.value()
>   }
> }) <--- specify somehow Partitioner here for the resulting rdd.
>
>
> The reason I ask is that it simply looks strange to me that Spark will have
> to shuffle each time my input stream and "state" stream during the
> mapWithState operation when I now for sure that those two streams will
> always share same keys and will not need access to others partitions.
>
> Thanks,
> Andrii
>
>
> 2016-10-31 15:45 GMT+02:00 Cody Koeninger :
>>
>> If you call a transformation on an rdd using the same partitioner as that
>> rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner, there's no
>> consistent partitioning scheme that works for all kafka uses. You can wrap
>> each kafkardd with an rdd that has a custom partitioner that you write to
>> match your kafka partitioning scheme, and avoid a shuffle.
>>
>> The danger there is if you have any misbehaving producers, or translate
>> the partitioning wrongly, you'll get bad results. It's safer just to
>> shuffle.
>>
>>
>> On Oct 31, 2016 04:31, "Andrii Biletskyi"
>>  wrote:
>>
>> Hi all,
>>
>> I'm using Spark Streaming mapWithState operation to do a stateful
>> operation on my Kafka stream (though I think similar arguments would apply
>> for any source).
>>
>> Trying to understand a way to control mapWithState's partitioning schema.
>>
>> My transformations are simple:
>>
>> 1) create KafkaDStream
>> 2) mapPartitions to get a key-value stream where `key` corresponds to
>> Kafka message key
>> 3) apply mapWithState operation on key-value stream, the state stream
>> shares keys with the original stream, the resulting streams doesn't change
>> keys either
>>
>> The problem is that, as I understand, mapWithState stream has a different
>> partitioning schema and thus I see shuffles in Spark Web UI.
>>
>> From the mapWithState implementation I see that:
>> mapwithState uses Partitioner if specified, otherwise partitions data with
>> HashPartitioner(). The thing is that original
>> KafkaDStream has a specific partitioning schema: Kafka partitions correspond
>> Spark RDD partitions.
>>
>> Question: is there a way for mapWithState stream to inherit partitioning
>> schema from the original stream (i.e. correspond to Kafka partitions).
>>
>> Thanks,
>> Andrii
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



MapWithState with large state

2016-10-31 Thread Abhishek Singh
Can it handle state that is large than what memory will hold?


Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-10-31 Thread Michael David Pedersen
Hi Mich,

Thank you again for your reply.

As I see you are caching the table already sorted
>
> val keyValRDDSorted = keyValRDD.sortByKey().cache
>
> and the next stage is you are creating multiple tempTables (different
> ranges) that cache a subset of rows already cached in RDD. The data stored
> in tempTable is in Hive columnar format (I assume that means ORC format)
>

But the thing is that I don't explicitly cache the tempTables, and I don't
really want to because I'll only run a single query on each tempTable. So I
expect the SQL query processor to operate directly on the underlying
key-value RDD, and my concern is that this may be inefficient.


> Well that is all you can do.
>

Ok, thanks - that's really what I wanted to get confirmation of.


> Bear in mind that these tempTables are immutable and I do not know any way
> of dropping tempTable to free more memory.
>

I'm assuming there won't be any (significant) memory overhead of
registering the temp tables as long as I don't explicitly cache them. Am I
wrong? In any case I'll be calling sqlContext.dropTempTable once the query
has completed, which according to the documentation should also free up
memory.

Cheers,
Michael


Re: MapWithState partitioning

2016-10-31 Thread Cody Koeninger
If you call a transformation on an rdd using the same partitioner as that
rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner, there's
no consistent partitioning scheme that works for all kafka uses. You can
wrap each kafkardd with an rdd that has a custom partitioner that you write
to match your kafka partitioning scheme, and avoid a shuffle.

The danger there is if you have any misbehaving producers, or translate the
partitioning wrongly, you'll get bad results. It's safer just to shuffle.

On Oct 31, 2016 04:31, "Andrii Biletskyi"
 wrote:

Hi all,

I'm using Spark Streaming mapWithState operation to do a stateful operation
on my Kafka stream (though I think similar arguments would apply for any
source).

Trying to understand a way to control mapWithState's partitioning schema.

My transformations are simple:

1) create KafkaDStream
2) mapPartitions to get a key-value stream where `key` corresponds to Kafka
message key
3) apply mapWithState operation on key-value stream, the state stream
shares keys with the original stream, the resulting streams doesn't change
keys either

The problem is that, as I understand, mapWithState stream has a different
partitioning schema and thus I see shuffles in Spark Web UI.

>From the mapWithState implementation I see that:
mapwithState uses Partitioner if specified, otherwise partitions data with
HashPartitioner(). The thing is that original
KafkaDStream has a specific partitioning schema: Kafka partitions
correspond Spark RDD partitions.

Question: is there a way for mapWithState stream to inherit partitioning
schema from the original stream (i.e. correspond to Kafka partitions).

Thanks,
Andrii


Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-10-31 Thread Mich Talebzadeh
Hi Michael,

As I see you are caching the table already sorted

val keyValRDDSorted = keyValRDD.sortByKey().cache

and the next stage is you are creating multiple tempTables (different
ranges) that cache a subset of rows already cached in RDD. The data stored
in tempTable is in Hive columnar format (I assume that means ORC format)

Well that is all you can do. Bear in mind that these tempTables are
immutable and I do not know any way of dropping tempTable to free more
memory.

Depending on the size of the main table, caching the whole table may
require a lot of memory but you can see this in UI storage page.
Alternative is to use persist(StorageLevel.MEMORY_AND_DISK_SER()) with a
mix of cached and disk.

HTH







Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 October 2016 at 10:55, Michael David Pedersen <
michael.d.peder...@googlemail.com> wrote:

> Hi Mich,
>
> Thank you for your quick reply!
>
> What type of table is the underlying table? Is it Hbase, Hive ORC or what?
>>
>
> It is a custom datasource, but ultimately backed by HBase.
>
>
>> By Key you mean a UNIQUE ID or something similar and then you do multiple
>> scans on the tempTable which stores data using in-memory columnar format.
>>
>
> The key is a unique ID, yes. But note that I don't actually do multiple
> scans on the same temp table: I create a new temp table for every query I
> want to run, because each query will be based on a different key range. The
> caching is at the level of the full key-value RDD.
>
> If I did instead cache the temp table, I don't see a way of exploiting key
> ordering for key range filters?
>
>
>> That is the optimisation of tempTable storage as far as I know.
>>
>
> So it seems to me that my current solution won't be using this
> optimisation, as I'm caching the RDD rather than the temp table.
>
>
>> Have you tried it using predicate push-down on the underlying table
>> itself?
>>
>
> No, because I essentially want to load the entire table into memory before
> doing any queries. At that point I have nothing to push down.
>
> Cheers,
> Michael
>


RE: Help needed in parsing JSon with nested structures

2016-10-31 Thread Jan Botorek
Hello,
>From my point of view, it would be more efficient and probably i more 
>"readible" if you just extracted the required data using some json parsing 
>library (GSON, Jackson), construct some global object (or pre-process data), 
>and then begin with the Spark operations.

Jan

From: Kappaganthu, Sivaram (ES) [mailto:sivaram.kappagan...@adp.com]
Sent: Monday, October 31, 2016 11:50 AM
To: user@spark.apache.org
Subject: Help needed in parsing JSon with nested structures

Hello All,



I am processing a nested complex Json and below is the schema for it.
root
|-- businessEntity: array (nullable = true)
||-- element: struct (containsNull = true)
|||-- payGroup: array (nullable = true)
||||-- element: struct (containsNull = true)
|||||-- reportingPeriod: struct (nullable = true)
||||||-- worker: array (nullable = true)
|||||||-- element: struct (containsNull = true)
||||||||-- category: string (nullable = true)
||||||||-- person: struct (nullable = true)
||||||||-- tax: array (nullable = true)
|||||||||-- element: struct (containsNull = 
true)
||||||||||-- code: string (nullable = true)
||||||||||-- qtdAmount: double (nullable = 
true)
||||||||||-- ytdAmount: double (nullable =
My requirement is to create a hashmap with code concatenated with qtdAmount as 
key and value of qtdAmount as value. Map.put(code + "qtdAmount" , qtdAmount). 
How can i do this with spark.
I tried with below shell commands.
import org.apache.spark.sql._
val sqlcontext = new SQLContext(sc)
val cdm = sqlcontext.read.json("/user/edureka/CDM/cdm.json")
val spark = 
SparkSession.builder().appName("SQL").config("spark.some.config.option","some-vale").getOrCreate()
cdm.createOrReplaceTempView("CDM")
val sqlDF = spark.sql("SELECT businessEntity[0].payGroup[0] from CDM").show()
val address = spark.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker[0].person.address from CDM 
as address")
val worker = spark.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker from CDM")
val tax = spark.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from CDM")
val tax = sqlcontext.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from CDM")
val codes = tax.select(expode(tax("code"))
scala> val codes = 
tax.withColumn("code",explode(tax("tax.code"))).withColumn("qtdAmount",explode(tax("tax.qtdAmount"))).withColumn("ytdAmount",explode(tax("tax.ytdAmount")))


i am trying to get all the codes and qtdAmount into a map. But i am not getting 
it. Using multiple explode statements for a single DF, is producing Cartesian 
product of the elements.
Could someone please help on how to parse the json of this much complex in 
spark.


Thanks,
Sivaram


This message and any attachments are intended only for the use of the addressee 
and may contain information that is privileged and confidential. If the reader 
of the message is not the intended recipient or an authorized representative of 
the intended recipient, you are hereby notified that any dissemination of this 
communication is strictly prohibited. If you have received this communication 
in error, notify the sender immediately by return email and delete the message 
and any attachments from your system.


Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-10-31 Thread Michael David Pedersen
Hi Mich,

Thank you for your quick reply!

What type of table is the underlying table? Is it Hbase, Hive ORC or what?
>

It is a custom datasource, but ultimately backed by HBase.


> By Key you mean a UNIQUE ID or something similar and then you do multiple
> scans on the tempTable which stores data using in-memory columnar format.
>

The key is a unique ID, yes. But note that I don't actually do multiple
scans on the same temp table: I create a new temp table for every query I
want to run, because each query will be based on a different key range. The
caching is at the level of the full key-value RDD.

If I did instead cache the temp table, I don't see a way of exploiting key
ordering for key range filters?


> That is the optimisation of tempTable storage as far as I know.
>

So it seems to me that my current solution won't be using this
optimisation, as I'm caching the RDD rather than the temp table.


> Have you tried it using predicate push-down on the underlying table itself?
>

No, because I essentially want to load the entire table into memory before
doing any queries. At that point I have nothing to push down.

Cheers,
Michael


Help needed in parsing JSon with nested structures

2016-10-31 Thread Kappaganthu, Sivaram (ES)
Hello All,



I am processing a nested complex Json and below is the schema for it.
root
|-- businessEntity: array (nullable = true)
||-- element: struct (containsNull = true)
|||-- payGroup: array (nullable = true)
||||-- element: struct (containsNull = true)
|||||-- reportingPeriod: struct (nullable = true)
||||||-- worker: array (nullable = true)
|||||||-- element: struct (containsNull = true)
||||||||-- category: string (nullable = true)
||||||||-- person: struct (nullable = true)
||||||||-- tax: array (nullable = true)
|||||||||-- element: struct (containsNull = 
true)
||||||||||-- code: string (nullable = true)
||||||||||-- qtdAmount: double (nullable = 
true)
||||||||||-- ytdAmount: double (nullable =
My requirement is to create a hashmap with code concatenated with qtdAmount as 
key and value of qtdAmount as value. Map.put(code + "qtdAmount" , qtdAmount). 
How can i do this with spark.
I tried with below shell commands.
import org.apache.spark.sql._
val sqlcontext = new SQLContext(sc)
val cdm = sqlcontext.read.json("/user/edureka/CDM/cdm.json")
val spark = 
SparkSession.builder().appName("SQL").config("spark.some.config.option","some-vale").getOrCreate()
cdm.createOrReplaceTempView("CDM")
val sqlDF = spark.sql("SELECT businessEntity[0].payGroup[0] from CDM").show()
val address = spark.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker[0].person.address from CDM 
as address")
val worker = spark.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker from CDM")
val tax = spark.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from CDM")
val tax = sqlcontext.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from CDM")
val codes = tax.select(expode(tax("code"))
scala> val codes = 
tax.withColumn("code",explode(tax("tax.code"))).withColumn("qtdAmount",explode(tax("tax.qtdAmount"))).withColumn("ytdAmount",explode(tax("tax.ytdAmount")))


i am trying to get all the codes and qtdAmount into a map. But i am not getting 
it. Using multiple explode statements for a single DF, is producing Cartesian 
product of the elements.
Could someone please help on how to parse the json of this much complex in 
spark.


Thanks,
Sivaram

--
This message and any attachments are intended only for the use of the addressee 
and may contain information that is privileged and confidential. If the reader 
of the message is not the intended recipient or an authorized representative of 
the intended recipient, you are hereby notified that any dissemination of this 
communication is strictly prohibited. If you have received this communication 
in error, notify the sender immediately by return email and delete the message 
and any attachments from your system.


Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread kant kodali
The source of my problem is actually that I am running into the following
error. This error seems to happen after running my driver program for 4
hours.

"Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
"dag-scheduler-event-loop" Exception in thread "ForkJoinPool-50-worker-13"
java.lang.OutOfMemoryError: unable to create new native thread"

and this wonderful book

taught
me that the error "unable to create new native thread" can happen because
JVM is trying to request the OS for a thread and it is refusing to do so
for the following reasons

1. The system has actually run out of virtual memory.
2. On Unix-style systems, the user has already created (between all
programs user is running) the maximum number of processes configured for
that user login. Individual threads are considered a process in that
regard.

Option #2 is ruled out in my case because my driver programing is running
with a userid of root which has  maximum number of processes set to 120242

ulimit -a gives me the following

core file size  (blocks, -c) 0
data seg size   (kbytes, -d) unlimited
scheduling priority (-e) 0
file size   (blocks, -f) unlimited
pending signals (-i) 120242
max locked memory   (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files  (-n) 1024
pipe size(512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority  (-r) 0
stack size  (kbytes, -s) 8192
cpu time   (seconds, -t) unlimited
max user processes  (-u) 120242
virtual memory  (kbytes, -v) unlimited
file locks  (-x) unlimited

So at this point I do understand that the I am running out of memory due to
allocation of threads so my biggest question is how do I tell my spark
driver program to not create so many?

On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen  wrote:

> ps -L [pid] is what shows threads. I am not sure this is counting what you
> think it does. My shell process has about a hundred threads, and I can't
> imagine why one would have thousands unless your app spawned them.
>
> On Mon, Oct 31, 2016 at 10:20 AM kant kodali  wrote:
>
>> when I do
>>
>> ps -elfT | grep "spark-driver-program.jar" | wc -l
>>
>> The result is around 32K. why does it create so many threads how can I
>> limit this?
>>
>


Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-10-31 Thread Mich Talebzadeh
Hi Michael.

What type of table is the underlying table? Is it Hbase, Hive ORC or what?

By Key you mean a UNIQUE ID or something similar and then you do multiple
scans on the tempTable which stores data using in-memory columnar format.

That is the optimisation of tempTable storage as far as I know.

Have you tried it using predicate push-down on the underlying table itself?

HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 October 2016 at 10:06, Michael David Pedersen <
michael.d.peder...@googlemail.com> wrote:

> Hello,
>
> I've got a Spark SQL dataframe containing a "key" column. The queries I
> want to run start by filtering on the key range. My question in outline: is
> it possible to sort the dataset by key so as to do efficient key range
> filters, before subsequently running a more complex SQL query?
>
> I'm aware that such efficient filtering is possible for key-value RDDs,
> i.e. RDDs over Tuple2, using PairRDDFunctions. My workflow currently looks
> as follows:
>
> // Create a dataframe
> val df: DataFrame = sqlContext.sql("SELECT * FROM ...")
> val keyValRDD = df.rdd.map( (r: Row) => (r.getAs[String]("key"), r) )
>
> // Sort by key - and cache.
> val keyValRDDSorted = keyValRDD.sortByKey().cache
>
> // Define a function to run SQL query on a range.
> def queryRange(lower: String, upper: String, sql: String, tableName:
> String) = {
> val rangeRDD = keyValRDDSorted.filterByRange(lower, upper)
> val rangeDF = sqlContext.createDataFrame(rangeRDD.map{ _._2 },
> df.schema)
> rangeDF.createTempView(tableName)
> sqlContext.sql(sql)
> }
>
> // Invoke multiple times.
> queryRange(...)
> queryRange(...)
> ...
>
> This works, and is efficient in that only the partitions containing the
> relevant key range are processed. However, I understand that Spark SQL uses
> an optimised storage format as compared to plain RDDs. The above workflow
> can't take advantage of this, as it is the key-value RDD that is cached.
>
> So, my specific question: Is there a more efficient way of achieving the
> desired result?
>
> Any pointers would be much appreciated.
>
> Many thanks,
> Michael
>
> PS: This question was also asked on StackOverflow -
> http://stackoverflow.com/questions/40129411/efficient-
> filtering-on-spark-sql-dataframes-with-ordered-keys.
>


Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Sean Owen
ps -L [pid] is what shows threads. I am not sure this is counting what you
think it does. My shell process has about a hundred threads, and I can't
imagine why one would have thousands unless your app spawned them.

On Mon, Oct 31, 2016 at 10:20 AM kant kodali  wrote:

> when I do
>
> ps -elfT | grep "spark-driver-program.jar" | wc -l
>
> The result is around 32K. why does it create so many threads how can I
> limit this?
>


why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread kant kodali
when I do

ps -elfT | grep "spark-driver-program.jar" | wc -l

The result is around 32K. why does it create so many threads how can I
limit this?


Efficient filtering on Spark SQL dataframes with ordered keys

2016-10-31 Thread Michael David Pedersen
Hello,

I've got a Spark SQL dataframe containing a "key" column. The queries I
want to run start by filtering on the key range. My question in outline: is
it possible to sort the dataset by key so as to do efficient key range
filters, before subsequently running a more complex SQL query?

I'm aware that such efficient filtering is possible for key-value RDDs,
i.e. RDDs over Tuple2, using PairRDDFunctions. My workflow currently looks
as follows:

// Create a dataframe
val df: DataFrame = sqlContext.sql("SELECT * FROM ...")
val keyValRDD = df.rdd.map( (r: Row) => (r.getAs[String]("key"), r) )

// Sort by key - and cache.
val keyValRDDSorted = keyValRDD.sortByKey().cache

// Define a function to run SQL query on a range.
def queryRange(lower: String, upper: String, sql: String, tableName:
String) = {
val rangeRDD = keyValRDDSorted.filterByRange(lower, upper)
val rangeDF = sqlContext.createDataFrame(rangeRDD.map{ _._2 },
df.schema)
rangeDF.createTempView(tableName)
sqlContext.sql(sql)
}

// Invoke multiple times.
queryRange(...)
queryRange(...)
...

This works, and is efficient in that only the partitions containing the
relevant key range are processed. However, I understand that Spark SQL uses
an optimised storage format as compared to plain RDDs. The above workflow
can't take advantage of this, as it is the key-value RDD that is cached.

So, my specific question: Is there a more efficient way of achieving the
desired result?

Any pointers would be much appreciated.

Many thanks,
Michael

PS: This question was also asked on StackOverflow -
http://stackoverflow.com/questions/40129411/efficient-filtering-on-spark-sql-dataframes-with-ordered-keys
.


Re: Do you use spark 2.0 in work?

2016-10-31 Thread Andy Dang
This is my personal email so I can't exactly discuss work-related topics.

But yes, many teams in my company use Spark 2.0 in production environment.
What are the challenges that prevent you from adopting it (besides
migration from Spark 1.x)?

---
Regards,
Andy

On Mon, Oct 31, 2016 at 8:16 AM, Yang Cao  wrote:

> Hi guys,
>
> Just for personal interest. I wonder whether spark 2.0 a productive
> version? Is there any company use this version as its main version in daily
> work? THX
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


MapWithState partitioning

2016-10-31 Thread Andrii Biletskyi
Hi all,

I'm using Spark Streaming mapWithState operation to do a stateful operation
on my Kafka stream (though I think similar arguments would apply for any
source).

Trying to understand a way to control mapWithState's partitioning schema.

My transformations are simple:

1) create KafkaDStream
2) mapPartitions to get a key-value stream where `key` corresponds to Kafka
message key
3) apply mapWithState operation on key-value stream, the state stream
shares keys with the original stream, the resulting streams doesn't change
keys either

The problem is that, as I understand, mapWithState stream has a different
partitioning schema and thus I see shuffles in Spark Web UI.

>From the mapWithState implementation I see that:
mapwithState uses Partitioner if specified, otherwise partitions data with
HashPartitioner(). The thing is that original
KafkaDStream has a specific partitioning schema: Kafka partitions
correspond Spark RDD partitions.

Question: is there a way for mapWithState stream to inherit partitioning
schema from the original stream (i.e. correspond to Kafka partitions).

Thanks,
Andrii


Do you use spark 2.0 in work?

2016-10-31 Thread Yang Cao
Hi guys,

Just for personal interest. I wonder whether spark 2.0 a productive version? Is 
there any company use this version as its main version in daily work? THX
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



task not serializable in case of groupByKey() + mapGroups + map?

2016-10-31 Thread Yang
with the following simple code


val a =
sc.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)]
val grouped = a.groupByKey({x:(Int,Int)=>x._1})
val mappedGroups = grouped.mapGroups((k,x)=>{(k,1)})
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>{
  val simpley = yyy.value

  1
})



I'm seeing error:
org.apache.spark.SparkException: Task not serializable
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2053)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.map(RDD.scala:365)
  ... 56 elided
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class:
org.apache.spark.sql.execution.QueryExecution, value: == Parsed Logical
Plan ==
'AppendColumns , unresolveddeserializer(newInstance(class
scala.Tuple2)), [input[0, int, true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Analyzed Logical Plan ==
_1: int, _2: int, value: int
AppendColumns , newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Optimized Logical Plan ==
AppendColumns , newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Physical Plan ==
AppendColumns , newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- Scan ExistingRDD[_1#201,_2#202])
- field (class: org.apache.spark.sql.KeyValueGroupedDataset, name:
queryExecution, type: class org.apache.spark.sql.execution.QueryExecution)
- object (class org.apache.spark.sql.KeyValueGroupedDataset,
org.apache.spark.sql.KeyValueGroupedDataset@71148f10)
- field (class: $iw, name: grouped, type: class
org.apache.spark.sql.KeyValueGroupedDataset)
- object (class $iw, $iw@7b1c13e4)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3e9a0c21)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@218cc682)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2ecedd08)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@79efd402)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@d81976c)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2d5d6e2a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@74dc6a7a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e220d85)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1c790a4f)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1d954b06)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1343c904)
- field (class: $line115.$read, name: $iw, type: class $iw)
- object (class $line115.$read, $line115.$read@42497908)
- field (class: $iw, name: $line115$read, type: class
$line115.$read)
- object (class $iw, $iw@af36da5)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@2fd5b99a)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, )
  at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 65 more