Re: I have trained a ML model, now what?

2019-01-23 Thread Pola Yao
Hi Riccardo,

Right now, Spark does not support low-latency predictions in Production.
MLeap is an alternative and it's been used in many scenarios. But it's good
to see that Spark Community has decided to provide such support.

On Wed, Jan 23, 2019 at 7:53 AM Riccardo Ferrari  wrote:

> Felix, thank you very much for the link. Much appreciated.
>
> The attached PDF is very interesting, I found myself evaluating many of
> the scenarios described in Q3. It's unfortunate the proposal is not being
> worked on, would be great to see that part of the code base.
>
> It is cool to see big players like Uber trying to make Open Source better,
> thanks!
>
>
> On Tue, Jan 22, 2019 at 5:24 PM Felix Cheung 
> wrote:
>
>> About deployment/serving
>>
>> SPIP
>> https://issues.apache.org/jira/browse/SPARK-26247
>>
>>
>> --
>> *From:* Riccardo Ferrari 
>> *Sent:* Tuesday, January 22, 2019 8:07 AM
>> *To:* User
>> *Subject:* I have trained a ML model, now what?
>>
>> Hi list!
>>
>> I am writing here to here about your experience on putting Spark ML
>> models into production at scale.
>>
>> I know it is a very broad topic with many different faces depending on
>> the use-case, requirements, user base and whatever is involved in the task.
>> Still I'd like to open a thread about this topic that is as important as
>> properly training a model and I feel is often neglected.
>>
>> The task is *serving web users with predictions* and the main challenge
>> I see is making it agile and swift.
>>
>> I think there are mainly 3 general categories of such deployment that can
>> be described as:
>>
>>- Offline/Batch: Load a model, performs the inference, store the
>>results in some datasotre (DB, indexes,...)
>>- Spark in the loop: Having a long running Spark context exposed in
>>some way, this include streaming as well as some custom application that
>>wraps the context.
>>- Use a different technology to load the Spark MLlib model and run
>>the inference pipeline. I have read about MLeap and other PMML based
>>solutions.
>>
>> I would love to hear about opensource solutions and possibly without
>> requiring cloud provider specific framework/component.
>>
>> Again I am aware each of the previous category have benefits and
>> drawback, so what would you pick? Why? and how?
>>
>> Thanks!
>>
>


Re: How to force-quit a Spark application?

2019-01-22 Thread Pola Yao
eploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
...
'''

What shall I do then? Thanks!


On Wed, Jan 16, 2019 at 1:15 PM Marcelo Vanzin  wrote:

> Those are daemon threads and not the cause of the problem. The main
> thread is waiting for the "org.apache.hadoop.util.ShutdownHookManager"
> thread, but I don't see that one in your list.
>
> On Wed, Jan 16, 2019 at 12:08 PM Pola Yao  wrote:
> >
> > Hi Marcelo,
> >
> > Thanks for your response.
> >
> > I have dumped the threads on the server where I submitted the spark
> application:
> >
> > '''
> > ...
> > "dispatcher-event-loop-2" #28 daemon prio=5 os_prio=0
> tid=0x7f56cee0e000 nid=0x1cb6 waiting on condition [0x7f5699811000]
> >java.lang.Thread.State: WAITING (parking)
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for  <0x0006400161b8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> > at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> > at
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > "dispatcher-event-loop-1" #27 daemon prio=5 os_prio=0
> tid=0x7f56cee0c800 nid=0x1cb5 waiting on condition [0x7f5699912000]
> >java.lang.Thread.State: WAITING (parking)
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for  <0x0006400161b8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> > at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> > at
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > "dispatcher-event-loop-0" #26 daemon prio=5 os_prio=0
> tid=0x7f56cee0c000 nid=0x1cb4 waiting on condition [0x7f569a12]
> >java.lang.Thread.State: WAITING (parking)
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for  <0x0006400161b8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> > at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> > at
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > "Service Thread" #20 daemon prio=9 os_prio=0 tid=0x7f56cc12d800
> nid=0x1ca5 runnable [0x]
> >java.lang.Thread.State: RUNNABLE
> >
> > "C1 CompilerThread14" #19 daemon prio=9 os_prio=0 tid=0x7f56cc12a000
> nid=0x1ca4 waiting on condition [0x]
> >java.lang.Thread.State: RUNNABLE
> > ...
> > "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x7f56cc0ce000 nid=0x1c93
> in Object.wait() [0x7f56ab3f2000]
> >java.lang.Thread.State: WAITING (on object monitor)
> > at java.lang.Object.wait(Native Method)
> > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> > - locked <0x0006400cd498> (a java.lang.ref.ReferenceQueue$Lock)
> > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> > at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
> >
> > "Reference Handler" #2 daemon prio=10 os_prio=0 

Re: How to force-quit a Spark application?

2019-01-16 Thread Pola Yao
scala.sys.package$.exit(package.scala:40)
at scala.sys.package$.exit(package.scala:33)
at
actionmodel.ParallelAdvertiserBeaconModel$.main(ParallelAdvertiserBeaconModel.scala:252)
at
actionmodel.ParallelAdvertiserBeaconModel.main(ParallelAdvertiserBeaconModel.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

"VM Thread" os_prio=0 tid=0x7f56cc0c1800 nid=0x1c91 runnable
...
'''

I have no clear idea what went wrong. I did call  awaitTermination to
terminate the thread pool. Or is there any way to force close all those
'WAITING' threads associated with my spark application?

On Wed, Jan 16, 2019 at 8:31 AM Marcelo Vanzin  wrote:

> If System.exit() doesn't work, you may have a bigger problem
> somewhere. Check your threads (using e.g. jstack) to see what's going
> on.
>
> On Wed, Jan 16, 2019 at 8:09 AM Pola Yao  wrote:
> >
> > Hi Marcelo,
> >
> > Thanks for your reply! It made sense to me. However, I've tried many
> ways to exit the spark (e.g., System.exit()), but failed. Is there an
> explicit way to shutdown all the alive threads in the spark application and
> then quit afterwards?
> >
> >
> > On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin 
> wrote:
> >>
> >> You should check the active threads in your app. Since your pool uses
> >> non-daemon threads, that will prevent the app from exiting.
> >>
> >> spark.stop() should have stopped the Spark jobs in other threads, at
> >> least. But if something is blocking one of those threads, or if
> >> something is creating a non-daemon thread that stays alive somewhere,
> >> you'll see that.
> >>
> >> Or you can force quit with sys.exit.
> >>
> >> On Tue, Jan 15, 2019 at 1:30 PM Pola Yao  wrote:
> >> >
> >> > I submitted a Spark job through ./spark-submit command, the code was
> executed successfully, however, the application got stuck when trying to
> quit spark.
> >> >
> >> > My code snippet:
> >> > '''
> >> > {
> >> >
> >> > val spark = SparkSession.builder.master(...).getOrCreate
> >> >
> >> > val pool = Executors.newFixedThreadPool(3)
> >> > implicit val xc = ExecutionContext.fromExecutorService(pool)
> >> > val taskList = List(train1, train2, train3)  // where train* is a
> Future function which wrapped up some data reading and feature engineering
> and machine learning steps
> >> > val results = Await.result(Future.sequence(taskList), 20 minutes)
> >> >
> >> > println("Shutting down pool and executor service")
> >> > pool.shutdown()
> >> > xc.shutdown()
> >> >
> >> > println("Exiting spark")
> >> > spark.stop()
> >> >
> >> > }
> >> > '''
> >> >
> >> > After I submitted the job, from terminal, I could see the code was
> executed and printing "Exiting spark", however, after printing that line,
> it never existed spark, just got stuck.
> >> >
> >> > Does any body know what the reason is? Or how to force quitting?
> >> >
> >> > Thanks!
> >> >
> >> >
> >>
> >>
> >> --
> >> Marcelo
>
>
>
> --
> Marcelo
>


Re: How to force-quit a Spark application?

2019-01-16 Thread Pola Yao
Hi Marcelo,

Thanks for your reply! It made sense to me. However, I've tried many ways
to exit the spark (e.g., System.exit()), but failed. Is there an explicit
way to shutdown all the alive threads in the spark application and then
quit afterwards?


On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin  wrote:

> You should check the active threads in your app. Since your pool uses
> non-daemon threads, that will prevent the app from exiting.
>
> spark.stop() should have stopped the Spark jobs in other threads, at
> least. But if something is blocking one of those threads, or if
> something is creating a non-daemon thread that stays alive somewhere,
> you'll see that.
>
> Or you can force quit with sys.exit.
>
> On Tue, Jan 15, 2019 at 1:30 PM Pola Yao  wrote:
> >
> > I submitted a Spark job through ./spark-submit command, the code was
> executed successfully, however, the application got stuck when trying to
> quit spark.
> >
> > My code snippet:
> > '''
> > {
> >
> > val spark = SparkSession.builder.master(...).getOrCreate
> >
> > val pool = Executors.newFixedThreadPool(3)
> > implicit val xc = ExecutionContext.fromExecutorService(pool)
> > val taskList = List(train1, train2, train3)  // where train* is a Future
> function which wrapped up some data reading and feature engineering and
> machine learning steps
> > val results = Await.result(Future.sequence(taskList), 20 minutes)
> >
> > println("Shutting down pool and executor service")
> > pool.shutdown()
> > xc.shutdown()
> >
> > println("Exiting spark")
> > spark.stop()
> >
> > }
> > '''
> >
> > After I submitted the job, from terminal, I could see the code was
> executed and printing "Exiting spark", however, after printing that line,
> it never existed spark, just got stuck.
> >
> > Does any body know what the reason is? Or how to force quitting?
> >
> > Thanks!
> >
> >
>
>
> --
> Marcelo
>


How to force-quit a Spark application?

2019-01-15 Thread Pola Yao
I submitted a Spark job through ./spark-submit command, the code was
executed successfully, however, the application got stuck when trying to
quit spark.

My code snippet:
'''
{

val spark = SparkSession.builder.master(...).getOrCreate

val pool = Executors.newFixedThreadPool(3)
implicit val xc = ExecutionContext.fromExecutorService(pool)
val taskList = List(train1, train2, train3)  // where train* is a Future
function which wrapped up some data reading and feature engineering and
machine learning steps
val results = Await.result(Future.sequence(taskList), 20 minutes)

println("Shutting down pool and executor service")
pool.shutdown()
xc.shutdown()

println("Exiting spark")
spark.stop()

}
'''

After I submitted the job, from terminal, I could see the code was executed
and printing "Exiting spark", however, after printing that line, it never
existed spark, just got stuck.

Does any body know what the reason is? Or how to force quitting?

Thanks!


[Spark-ml]Error in training ML models: Missing an output location for shuffle xxx

2019-01-07 Thread Pola Yao
Hi Spark Comminuty,

I was using XGBoost-spark to train a machine learning model. The dataset
was not large (around 1G). And I used the following command to submit my
application:
'''

./bin/spark-submit --master yarn --deploy-mode client --num-executors 50
--executor-cores 2 --executor-memory 3g --driver-memory 8g --conf
spark.executor.memoryOverhead=2g --conf spark.network.timeout=2000s --class
XXX --jars /path/to/jars /path/to/application
'''

And got the following errors:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 58

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 58
at 
org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:867)
at 
org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:863)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:863)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:677)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at 
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at 
ml.dmlc.xgboost4j.java.DataBatch$BatchIterator.hasNext(DataBatch.java:47)
at ml.dmlc.xgboost4j.java.XGBoostJNI.XGDMatrixCreateFromDataIter(Native 
Method)
at ml.dmlc.xgboost4j.java.DMatrix.(DMatrix.java:53)
at ml.dmlc.xgboost4j.scala.DMatrix.(DMatrix.scala:42)
at 
ml.dmlc.xgboost4j.scala.spark.Watches$.buildWatches(XGBoost.scala:436)
at 
ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4$$anonfun$12.apply(XGBoost.scala:276)
at 
ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4$$anonfun$12.apply(XGBoost.scala:275)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The error was occurred at foreachPartition at XGBoost.scala:287


Did anybody know what caused the error? Was it a memory issue?

Thanks!


[spark-ml] How to write a Spark Application correctly?

2019-01-02 Thread Pola Yao
Hello Spark Community,

I have a dataset of size 20G, 20 columns. Each column is categorical, so I
applied string-indexer and one-hot-encoding on every column. After, I
applied vector-assembler on all the newly derived columns to form a feature
vector for each record, and then feed the feature vectors to a ML algorithm.

However, during the feature engineering steps, I observed from Spark UI
that the input size (i.e., from Executor tab) increased dramatically to
600G+. The cluster I used might not have so much resource. Are there any
ways for optimizing the memory usage of intermediate results?

Thanks.


Fwd: Train multiple machine learning models in parallel

2018-12-19 Thread Pola Yao
Hi Comminuty,

I have a 1T dataset which contains records for  50 users. Each user has 20G
data averagely.

I wanted to use spark to train a machine learning model (e.g., XGBoost tree
model) for each user. Ideally, the result should be 50 models. However,
it'd be infeasible to submit 50 spark jobs through 'spark-submit'.

The model parameters and feature engineering steps for each user's data
would be exactly same, I am wondering if there is a way to train this 50
models in parallel?

Thanks!