Re: Spark performance testing

2016-07-08 Thread Andrew Ehrlich
Yea, I'm looking for any personal experiences people have had with tools like 
these. 

> On Jul 8, 2016, at 8:57 PM, charles li  wrote:
> 
> Hi, Andrew, I've got lots of materials when asking google for "spark 
> performance test"
> 
> https://github.com/databricks/spark-perf
> https://spark-summit.org/2014/wp-content/uploads/2014/06/Testing-Spark-Best-Practices-Anupama-Shetty-Neil-Marshall.pdf
> http://people.cs.vt.edu/~butta/docs/tpctc2015-sparkbench.pdf
> 
> 
>> On Sat, Jul 9, 2016 at 11:40 AM, Andrew Ehrlich  wrote:
>> Hi group,
>> 
>> What solutions are people using to do performance testing and tuning of 
>> spark applications? I have been doing a pretty manual technique where I lay 
>> out an Excel sheet of various memory settings and caching parameters and 
>> then execute each one by hand. It’s pretty tedious though, so I’m wondering 
>> what others do, and if you do performance testing at all.  Also, is anyone 
>> generating test data, or just operating on a static set? Is regression 
>> testing for performance a thing?
>> 
>> Andrew
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> 
> -- 
> ___
> Quant | Engineer | Boy
> ___
> blog:http://litaotao.github.io
> github: www.github.com/litaotao


Re: Spark performance testing

2016-07-08 Thread charles li
Hi, Andrew, I've got lots of materials when asking google for "*spark
performance test*"


   - https://github.com/databricks/spark-perf
   -
   
https://spark-summit.org/2014/wp-content/uploads/2014/06/Testing-Spark-Best-Practices-Anupama-Shetty-Neil-Marshall.pdf
   - http://people.cs.vt.edu/~butta/docs/tpctc2015-sparkbench.pdf



On Sat, Jul 9, 2016 at 11:40 AM, Andrew Ehrlich  wrote:

> Hi group,
>
> What solutions are people using to do performance testing and tuning of
> spark applications? I have been doing a pretty manual technique where I lay
> out an Excel sheet of various memory settings and caching parameters and
> then execute each one by hand. It’s pretty tedious though, so I’m wondering
> what others do, and if you do performance testing at all.  Also, is anyone
> generating test data, or just operating on a static set? Is regression
> testing for performance a thing?
>
> Andrew
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
*___*
Quant | Engineer | Boy
*___*
*blog*:http://litaotao.github.io
*github*: www.github.com/litaotao


Re: Is there a way to dynamic load files [ parquet or csv ] in the map function?

2016-07-08 Thread Deepak Sharma
Yes .You can do something like this :
.map(x=>mapfunction(x))

Thanks
Deepak
On 9 Jul 2016 9:22 am, "charles li"  wrote:

>
> hi, guys, is there a way to dynamic load files within the map function.
>
> i.e.
>
> Can I code as bellow:
>
>
> ​
>
> thanks a lot.
> ​
>
>
> --
> *___*
> ​  ​
> Quant | Engineer | Boy
> *___*
> *blog*:http://litaotao.github.io
> *github*: www.github.com/litaotao
>


Is there a way to dynamic load files [ parquet or csv ] in the map function?

2016-07-08 Thread charles li
hi, guys, is there a way to dynamic load files within the map function.

i.e.

Can I code as bellow:


​

thanks a lot.
​


-- 
*___*
​  ​
Quant | Engineer | Boy
*___*
*blog*:http://litaotao.github.io
*github*: www.github.com/litaotao


Spark performance testing

2016-07-08 Thread Andrew Ehrlich
Hi group,

What solutions are people using to do performance testing and tuning of spark 
applications? I have been doing a pretty manual technique where I lay out an 
Excel sheet of various memory settings and caching parameters and then execute 
each one by hand. It’s pretty tedious though, so I’m wondering what others do, 
and if you do performance testing at all.  Also, is anyone generating test 
data, or just operating on a static set? Is regression testing for performance 
a thing?

Andrew
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Presentation in London: Running Spark on Hive or Hive on Spark

2016-07-08 Thread mylisttech
Hi Mich,

Would it be on YouTube , post session ?

- Harmeet



On Jul 7, 2016, at 3:07, Mich Talebzadeh  wrote:

> Dear forum members
> 
> I will be presenting on the topic of "Running Spark on Hive or Hive on Spark, 
> your mileage varies" in Future of Data: London 
> 
> Details
> 
> Organized by: Hortonworks
> 
> Date: Wednesday, July 20, 2016, 6:00 PM to 8:30 PM 
> 
> Place: London
> 
> Location: One Canada Square, Canary Wharf,  London E14 5AB.
> 
> Nearest Underground:  Canary Warf (map)
> 
> If you are interested please register here
> 
> Looking forward to seeing those who can make it to have an interesting 
> discussion and leverage your experience.
> 
> Regards,
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Re: DataFrame Min By Column

2016-07-08 Thread Xinh Huynh
Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window
function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez 
wrote:

> Is there a way to on a GroupedData (from groupBy in DataFrame) to have an
> aggregate that returns column A based on a min of column B? For example, I
> have a list of sites visited by a given user and I would like to find the
> event with the minimum time (first event)
>
> Thanks,
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


Broadcast hash join implementation in Spark

2016-07-08 Thread Lalitha MV
Hi,

1. What implementation is used for the hash join -- is it classic hash join
or Hybrid grace hash join?
2. If the hash table does not fit in memory, does it spill or does it fail?
Are there parameters to control this (for example to set the percentage of
hash table which can spill etc.)
3. Is the size of the hash table fixed, or can we change it?

Thanks in advance,
Lalitha


Isotonic Regression, run method overloaded Error

2016-07-08 Thread dsp
Hi I am trying to perform Isotonic Regression on a data set with 9 features
and a label. 
When I run the algorithm similar to the way mentioned on MLlib page, I get
the error saying

/*error:* overloaded method value run with alternatives:
(input: org.apache.spark.api.java.JavaRDD[(java.lang.Double,
java.lang.Double,
java.lang.Double)])org.apache.spark.mllib.regression.IsotonicRegressionModel

  (input: org.apache.spark.rdd.RDD[(scala.Double, scala.Double,
scala.Double)])org.apache.spark.mllib.regression.IsotonicRegressionModel
 cannot be applied to (org.apache.spark.rdd.RDD[(scala.Double, scala.Double,
scala.Double, scala.Double, scala.Double, scala.Double, scala.Double,
scala.Double, scala.Double, scala.Double, scala.Double, scala.Double,
scala.Double)])
 val model = new
IsotonicRegression().setIsotonic(true).run(training)/

For the may given in the sample code, it looks like it can be done only for
dataset with a single feature because run() method can accept only three
parameters leaving which already has a label and a default value leaving
place for only one variable. 
So, How can this be done for multiple variables ? 

Regards,
Swaroop



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Isotonic-Regression-run-method-overloaded-Error-tp27313.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is that possible to launch spark streaming application on yarn with only one machine?

2016-07-08 Thread Rabin Banerjee
Ya , I mean dump in hdfs as a file ,via yarn cluster mode .
On Jul 8, 2016 3:10 PM, "Yu Wei"  wrote:

> How could I dump data into text file? Writing to HDFS or other approach?
>
>
> Thanks,
>
> Jared
> --
> *From:* Rabin Banerjee 
> *Sent:* Thursday, July 7, 2016 7:04:29 PM
> *To:* Yu Wei
> *Cc:* Mich Talebzadeh; user; Deng Ching-Mallete
> *Subject:* Re: Is that possible to launch spark streaming application on
> yarn with only one machine?
>
>
> In that case, I suspect that Mqtt is not getting data while you are
> submitting  in yarn cluster .
>
> Can you please try dumping data in text file instead of printing while
> submitting in yarn cluster mode.?
> On Jul 7, 2016 12:46 PM, "Yu Wei"  wrote:
>
>> Yes. Thanks for your clarification.
>>
>> The problem I encountered is that in yarn cluster mode, no output for
>> "DStream.print()" in yarn logs.
>>
>>
>> In spark implementation org/apache/spark/streaming/dstream/DStream.scala,
>> the logs related with "Time" was printed out. However, other information
>> for firstNum.take(num).foreach(println) was not printed in logs.
>>
>> What's the root cause for the behavior difference?
>>
>>
>> /**
>>* Print the first ten elements of each RDD generated in this DStream.
>> This is an output
>>* operator, so this DStream will be registered as an output stream and
>> there materialized.
>>*/
>>   def print(): Unit = ssc.withScope {
>> print(10)
>>   }
>>
>>   /**
>>* Print the first num elements of each RDD generated in this DStream.
>> This is an output
>>* operator, so this DStream will be registered as an output stream and
>> there materialized.
>>*/
>>   def print(num: Int): Unit = ssc.withScope {
>> def foreachFunc: (RDD[T], Time) => Unit = {
>>   (rdd: RDD[T], time: Time) => {
>> val firstNum = rdd.take(num + 1)
>> // scalastyle:off println
>> println("---")
>> println("Time: " + time)
>> println("---")
>> firstNum.take(num).foreach(println)
>> if (firstNum.length > num) println("...")
>> println()
>> // scalastyle:on println
>>   }
>> }
>>
>> Thanks,
>>
>> Jared
>>
>>
>> --
>> *From:* Rabin Banerjee 
>> *Sent:* Thursday, July 7, 2016 1:04 PM
>> *To:* Yu Wei
>> *Cc:* Mich Talebzadeh; Deng Ching-Mallete; user@spark.apache.org
>> *Subject:* Re: Is that possible to launch spark streaming application on
>> yarn with only one machine?
>>
>> In yarn cluster mode , Driver is running in AM , so you can find the logs
>> in that AM log . Open rersourcemanager UI , and check for the Job and logs.
>> or yarn logs -applicationId 
>>
>> In yarn client mode , the driver is the same JVM from where you are
>> launching ,,So you are getting it in the log .
>>
>> On Thu, Jul 7, 2016 at 7:56 AM, Yu Wei  wrote:
>>
>>> Launching via client deploy mode, it works again.
>>>
>>> I'm still a little confused about the behavior difference for cluster
>>> and client mode on a single machine.
>>>
>>>
>>> Thanks,
>>>
>>> Jared
>>> --
>>> *From:* Mich Talebzadeh 
>>> *Sent:* Wednesday, July 6, 2016 9:46:11 PM
>>> *To:* Yu Wei
>>> *Cc:* Deng Ching-Mallete; user@spark.apache.org
>>>
>>> *Subject:* Re: Is that possible to launch spark streaming application
>>> on yarn with only one machine?
>>>
>>> Deploy-mode cluster don't think will work.
>>>
>>> Try --master yarn --deploy-mode client
>>>
>>> FYI
>>>
>>>
>>>-
>>>
>>>*Spark Local* - Spark runs on the local host. This is the simplest
>>>set up and best suited for learners who want to understand different
>>>concepts of Spark and those performing unit testing.
>>>-
>>>
>>>*Spark Standalone *– a simple cluster manager included with Spark
>>>that makes it easy to set up a cluster.
>>>-
>>>
>>>*YARN Cluster Mode,* the Spark driver runs inside an application
>>>master process which is managed by YARN on the cluster, and the client 
>>> can
>>>go away after initiating the application. This is invoked with –master
>>>yarn and --deploy-mode cluster
>>>-
>>>
>>>*YARN Client Mode*, the driver runs in the client process, and the
>>>application master is only used for requesting resources from YARN. 
>>> Unlike Spark
>>>standalone mode, in which the master’s address is specified in the
>>>--master parameter, in YARN mode the ResourceManager’s address is
>>>picked up from the Hadoop configuration. Thus, the --master
>>>parameter is yarn. This is invoked with --deploy-mode client
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> 

DataFrame Min By Column

2016-07-08 Thread Pedro Rodriguez
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an
aggregate that returns column A based on a min of column B? For example, I
have a list of sites visited by a given user and I would like to find the
event with the minimum time (first event)

Thanks,
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Unresponsive Spark Streaming UI in YARN cluster mode - 1.5.2

2016-07-08 Thread Shixiong(Ryan) Zhu
Hey Tom,

Could you provide all blocked threads? Perhaps due to some potential
deadlock.

On Fri, Jul 8, 2016 at 10:30 AM, Ellis, Tom (Financial Markets IT) <
tom.el...@lloydsbanking.com.invalid> wrote:

> Hi There,
>
>
>
> We’re currently using HDP 2.3.4, Spark 1.5.2 with a Spark Streaming job in
> YARN Cluster mode consuming from a high volume Kafka topic. When we try to
> access the Spark Streaming UI on the application master, it is
> unresponsive/hangs or sometimes comes back with connection refused.
>
>
>
> It seems this UI is resident on the driver, and looking at its thread dump
> we see the below. Other tabs in the UI are fine. Does anyone have any
> ideas? Any further info required just ask.
>
>
>
> Thread 258: qtp1595613401-258 - /streaming/ (BLOCKED)
>
>
> org.apache.spark.streaming.DStreamGraph.getInputStreamName(DStreamGraph.scala:114)
>
>
> org.apache.spark.streaming.ui.StreamingJobProgressListener.streamName(StreamingJobProgressListener.scala:188)
>
>
> org.apache.spark.streaming.ui.StreamingPage$$anonfun$21.apply(StreamingPage.scala:429)
>
>
> org.apache.spark.streaming.ui.StreamingPage$$anonfun$21.apply(StreamingPage.scala:429)
>
> scala.Option.orElse(Option.scala:257)
>
> org.apache.spark.streaming.ui.StreamingPage.org
> $apache$spark$streaming$ui$StreamingPage$$generateInputDStreamRow(StreamingPage.scala:429)
>
>
> org.apache.spark.streaming.ui.StreamingPage$$anonfun$18.apply(StreamingPage.scala:396)
>
>
> org.apache.spark.streaming.ui.StreamingPage$$anonfun$18.apply(StreamingPage.scala:395)
>
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> scala.collection.AbstractTraversable.map(Traversable.scala:105)
>
>
> org.apache.spark.streaming.ui.StreamingPage.generateInputDStreamsTable(StreamingPage.scala:395)
>
>
> org.apache.spark.streaming.ui.StreamingPage.generateStatTable(StreamingPage.scala:348)
>
> org.apache.spark.streaming.ui.StreamingPage.render(StreamingPage.scala:157)
>
> org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
>
> org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
>
> org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:69)
>
> javax.servlet.http.HttpServlet.service(HttpServlet.java:735)
>
> javax.servlet.http.HttpServlet.service(HttpServlet.java:848)
>
>
> org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
>
>
> org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
>
>
> org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
>
>
> org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
>
>
> org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
>
>
> org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
>
>
> org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
>
>
> org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
>
> org.spark-project.jetty.server.Server.handle(Server.java:366)
>
>
> org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
>
>
> org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
>
>
> org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
>
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
>
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>
>
> org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
>
>
> org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
>
>
> org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
>
>
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>
>
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>
> java.lang.Thread.run(Thread.java:745)
>
>
>
> Thread 33: SparkListenerBus (BLOCKED)
>
>
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onJobStart(StreamingJobProgressListener.scala:123)
>
>
> org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
>
>
> org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
>
>
> org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
>
> org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
>
>
> 

Re: Random Forest Classification

2016-07-08 Thread Bryan Cutler
Hi Rich,

I looked at the notebook and it seems like you are fitting the
StringIndexer and VectorIndexer to only the training data, and it should
the the entire data set.  So if the training data does not include all of
the labels and an unknown label appears in the test data during evaluation,
then it will not know how to index it.  So your code should be like this,
fit with 'digits' instead of 'training'

val labelIndexer = new
StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(digits)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as
continuous.
val featureIndexer = new
VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(digits)

Hope that helps!

On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro  wrote:

> Hi Bryan.
>
> Thanks for your continued help.
>
> Here is the code shown in a Jupyter notebook. I figured this was easier
> that cutting and pasting the code into an email. If you  would like me to
> send you the code in a different format let, me know. The necessary data is
> all downloaded within the notebook itself.
>
>
> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f-4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2d94a794506bb282729dab8f05118fafe5f11886326e02fc
>
> A few additional pieces of information.
>
> 1. The training dataset is cached before training the model. If you do not
> cache the training dataset, the model will not train. The code
> model.transform(test) fails with a similar error. No other changes besides
> caching or not caching. Again, with the training dataset cached, the model
> can be successfully trained as seen in the notebook.
>
> 2. I have another version of the notebook where I download the same data
> in libsvm format rather than csv. That notebook works fine. All the code is
> essentially the same accounting for the difference in file formats.
>
> 3. I tested this same code on another Spark cloud platform and it displays
> the same symptoms when run there.
>
> Thanks.
> Rich
>
>
> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler  wrote:
>
>> Are you fitting the VectorIndexer to the entire data set and not just
>> training or test data?  If you are able to post your code and some data to
>> reproduce, that would help in troubleshooting.
>>
>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro  wrote:
>>
>>> Thanks for the response, but in my case I reversed the meaning of
>>> "prediction" and "predictedLabel". It seemed to make more sense to me that
>>> way, but in retrospect, it probably only causes confusion to anyone else
>>> looking at this. I reran the code with all the pipeline stage inputs and
>>> outputs named exactly as in the Random Forest Classifier example to make
>>> sure I hadn't messed anything up when I renamed things. Same error.
>>>
>>> I'm still at the point where I can train the model and make predictions,
>>> but not able to get the MulticlassClassificationEvaluator to work on
>>> the DataFrame of predictions.
>>>
>>> Any other suggestions? Thanks.
>>>
>>>
>>>
>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro  wrote:
>>>
 I created a ML pipeline using the Random Forest Classifier - similar to
 what is described here except in my case the source data is in csv format
 rather than libsvm.


 https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier

 I am able to successfully train the model and make predictions (on test
 data not used to train the model) as shown here.

 ++--+-+--++
 |indexedLabel|predictedLabel|label|prediction|features|
 ++--+-+--++
 | 4.0|   4.0|0| 0|(784,[124,125,126...|
 | 2.0|   2.0|3| 3|(784,[119,120,121...|
 | 8.0|   8.0|8| 8|(784,[180,181,182...|
 | 0.0|   0.0|1| 1|(784,[154,155,156...|
 | 3.0|   8.0|2| 8|(784,[148,149,150...|
 ++--+-+--++
 only showing top 5 rows

 However, when I attempt to calculate the error between the indexedLabel 
 and the precictedLabel using the MulticlassClassificationEvaluator, I get 
 the NoSuchElementException error attached below.

 val evaluator = new 
 MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
 val accuracy = evaluator.evaluate(predictions)
 println("Test Error = " + (1.0 - accuracy))

 What could be the issue?



 Name: org.apache.spark.SparkException
 Message: Job aborted due to 

Unresponsive Spark Streaming UI in YARN cluster mode - 1.5.2

2016-07-08 Thread Ellis, Tom (Financial Markets IT)
Hi There,

We're currently using HDP 2.3.4, Spark 1.5.2 with a Spark Streaming job in YARN 
Cluster mode consuming from a high volume Kafka topic. When we try to access 
the Spark Streaming UI on the application master, it is unresponsive/hangs or 
sometimes comes back with connection refused.

It seems this UI is resident on the driver, and looking at its thread dump we 
see the below. Other tabs in the UI are fine. Does anyone have any ideas? Any 
further info required just ask.

Thread 258: qtp1595613401-258 - /streaming/ (BLOCKED)
org.apache.spark.streaming.DStreamGraph.getInputStreamName(DStreamGraph.scala:114)
org.apache.spark.streaming.ui.StreamingJobProgressListener.streamName(StreamingJobProgressListener.scala:188)
org.apache.spark.streaming.ui.StreamingPage$$anonfun$21.apply(StreamingPage.scala:429)
org.apache.spark.streaming.ui.StreamingPage$$anonfun$21.apply(StreamingPage.scala:429)
scala.Option.orElse(Option.scala:257)
org.apache.spark.streaming.ui.StreamingPage.org$apache$spark$streaming$ui$StreamingPage$$generateInputDStreamRow(StreamingPage.scala:429)
org.apache.spark.streaming.ui.StreamingPage$$anonfun$18.apply(StreamingPage.scala:396)
org.apache.spark.streaming.ui.StreamingPage$$anonfun$18.apply(StreamingPage.scala:395)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.AbstractTraversable.map(Traversable.scala:105)
org.apache.spark.streaming.ui.StreamingPage.generateInputDStreamsTable(StreamingPage.scala:395)
org.apache.spark.streaming.ui.StreamingPage.generateStatTable(StreamingPage.scala:348)
org.apache.spark.streaming.ui.StreamingPage.render(StreamingPage.scala:157)
org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:69)
javax.servlet.http.HttpServlet.service(HttpServlet.java:735)
javax.servlet.http.HttpServlet.service(HttpServlet.java:848)
org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
org.spark-project.jetty.server.Server.handle(Server.java:366)
org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
java.lang.Thread.run(Thread.java:745)

Thread 33: SparkListenerBus (BLOCKED)
org.apache.spark.streaming.ui.StreamingJobProgressListener.onJobStart(StreamingJobProgressListener.scala:123)
org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79)
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
Thread 541: streaming-job-executor-0 (WAITING)
Thread 131: StreamingListenerBus (BLOCKED)

can I use ExectorService in my driver? was: is dataframe.write() async? Streaming performance problem

2016-07-08 Thread Andy Davidson
Hi Ewan

Currently I split my dataframe into n smaller dataframes can call
write.().json(³S3://³)

Each data frame becomes a single S3 object.

I assume for your solution to work I would need to reparation(1) each of the
smaller sets so that they are written as a single s3 object.


I am also considering using a java executorService and thread pool. Its easy
to do. Each thread would call df.write.json(³s3²://); One advantage of this
is that I do not need to make any assumptions about how spark is
implemented.

I assume the thread pool is running on the driver so the slaves do not incur
any extra overhead.

Thanks

Andy

From:  Ewan Leith 
Date:  Friday, July 8, 2016 at 8:52 AM
To:  Cody Koeninger , Andrew Davidson

Cc:  "user @spark" 
Subject:  RE: is dataframe.write() async? Streaming performance problem

> Writing (or reading) small files from spark to s3 can be seriously slow.
> 
> You'll get much higher throughput by doing a df.foreachPartition(partition =>
> ...) and inside each partition, creating an aws s3 client then doing a
> partition.foreach and uploading the files using that s3 client with its own
> threadpool.
> 
> As long as you create the s3 client inside the foreachPartition, and close it
> after the partition.foreach(...) is done, you shouldn't have any issues.
> 
> Something roughly like this from the DStream docs:
> 
>   df.foreachPartition { partitionOfRecords =>
> val connection = createNewConnection()
> partitionOfRecords.foreach(record => connection.send(record))
> connection.close()
>   }
> 
> Hope this helps,
> Ewan
> 
> -Original Message-
> From: Cody Koeninger [mailto:c...@koeninger.org]
> Sent: 08 July 2016 15:31
> To: Andy Davidson 
> Cc: user @spark 
> Subject: Re: is dataframe.write() async? Streaming performance problem
> 
> Maybe obvious, but what happens when you change the s3 write to a println of
> all the data?  That should identify whether it's the issue.
> 
> count() and read.json() will involve additional tasks (run through the items
> in the rdd to count them, likewise to infer the schema) but for
> 300 records that shouldn't be much of an issue.
> 
> On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson 
> wrote:
>>  I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using
>>  kafka direct stream approach. I am running into performance problems.
>>  My processing time is > than my window size. Changing window sizes,
>>  adding cores and executor memory does not change performance. I am
>>  having a lot of trouble identifying the problem by at the metrics
>>  provided for streaming apps in the spark application web UI.
>> 
>>  I think my performance problem has to with writing the data to S3.
>> 
>>  My app receives very complicated JSON. My program is simple, It sorts
>>  the data into a small set of sets and writes each set as a separate S3
>> object.
>>  The mini batch data has at most 300 events so I do not think shuffle
>>  is an issue.
>> 
>>  DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();
>> 
>>  Š Explode tagCol Š
>> 
>> 
>>  DataFrame rulesDF = activityDF.select(tagCol).distinct();
>> 
>>  Row[] rows = rulesDF.select(tagCol).collect();
>> 
>>  List tags = new ArrayList(100);
>> 
>>  for (Row row : rows) {
>> 
>>  Object tag = row.get(0);
>> 
>>  tags.add(tag.toString());
>> 
>>  }
>> 
>> 
>>  I think the for loop bellow is where the bottle neck is. Is write async() ?
>> 
>> 
>>  If not is there an easy to to vectorize/parallelize this for loop or
>>  do I have to create the threads my self?
>> 
>> 
>>  Is creating threads in spark a bad idea?
>> 
>> 
>> 
>>  for(String tag : tags) {
>> 
>>  DataFrame saveDF =
>>  activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>> 
>>  if (saveDF.count() >= 1) { // I do not think count() is an issue
>>  performance is about 34 ms
>> 
>>  String dirPath = ³s3n://myBucket" + File.separator + date +
>>  File.separator + tag + File.separator +  milliSeconds;
>> 
>>  saveDF.write().json(dirPath);
>> 
>>  }
>> 
>>  }
>> 
>> 
>>  Any suggestions would be greatly appreciated
>> 
>> 
>>  Andy
>> 
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 




Iterate over columns in sql.dataframe

2016-07-08 Thread Pasquinell Urbani
Hi all

I need to apply QuantileDiscretizer() over a 16 columns sql.dataframe.
Which is the most efficient way to apply a function over each columns? Do I
need to iterate over columns? Which is the best way to do this?

Thank you all.


Spark Terasort Help

2016-07-08 Thread Punit Naik
Hi Guys

I am trying to run spark terasort benchmark provided by ehiggs
 on github. Terasort on 1 gb, 10
gb and 100gb works fine. But when it comes to 1000 gb, the program seems to
run into problems. The 1000 gb terasort actually completes on single-node
in 5 hours or so. But in case of multi-node, it always fails.

The errors show that executors are being lost. And they keep on failing
till the job is automatically killed. Again, 1000 gb terasort completes
with single-node. Its multi-node which is the problem. I guess there are
some co-ordination and timeout issues between the nodes.

The command that I am using is:

time $SPARK_HOME/bin/spark-submit --master spark://master-ip:7077 --conf
"spark.akka.timeout=2400" --conf "spark.akka.askTimeout=2400" --conf
"spark.akka.frameSize=500" --conf
"spark.core.connection.ack.wait.timeout=2400" --conf
"spark.driver.maxResultSize=16g" --conf "spark.driver.cores=10" --conf
"spark.executor.memory=4g" --conf "spark.driver.memory=50g" --driver-memory
50g --conf "spark.eventLog.enabled=true" --conf
"spark.eventLog.dir=hdfs://master-ip:54310/sparkevents" --conf
"spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties"
--class com.github.ehiggs.spark.terasort.TeraSort
~/spark-terasort/target/spark-terasort-1.0-jar-with-dependencies.jar
hdfs://master-ip:54310/teragen-1t hdfs://master-ip:54310/terasort-1t

The 1 tb teragen that I ran prior to this had 2 partitions
("spark.default.parallelism=2").

And these are the specifications and configurations:

hardware:
1 master, 2 slaves
master -> 96 cores, 137 gb ram
slaves -> 192 cores, 237 gb ram

spark configuration:
slaves -> 64 workers, 3 cores for each worker, 3 gb RAM to each worker

I am running the program from another machine which is not the part of this
cluster but has SSH access to and from every machine.

I have tried it with a lot of configurations but every time it failed. The
one which is above is he latest one which is failing.

Can anyone help me in designing the configuration or set some properties
which will not result in executors failing and let the tersort complete?

-- 
Thank You

Regards

Punit Naik


RE: is dataframe.write() async? Streaming performance problem

2016-07-08 Thread Ewan Leith
Writing (or reading) small files from spark to s3 can be seriously slow.

You'll get much higher throughput by doing a df.foreachPartition(partition => 
...) and inside each partition, creating an aws s3 client then doing a 
partition.foreach and uploading the files using that s3 client with its own 
threadpool.

As long as you create the s3 client inside the foreachPartition, and close it 
after the partition.foreach(...) is done, you shouldn't have any issues.

Something roughly like this from the DStream docs:

  df.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
  }

Hope this helps,
Ewan

-Original Message-
From: Cody Koeninger [mailto:c...@koeninger.org] 
Sent: 08 July 2016 15:31
To: Andy Davidson 
Cc: user @spark 
Subject: Re: is dataframe.write() async? Streaming performance problem

Maybe obvious, but what happens when you change the s3 write to a println of 
all the data?  That should identify whether it's the issue.

count() and read.json() will involve additional tasks (run through the items in 
the rdd to count them, likewise to infer the schema) but for
300 records that shouldn't be much of an issue.

On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson  
wrote:
> I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using 
> kafka direct stream approach. I am running into performance problems. 
> My processing time is > than my window size. Changing window sizes, 
> adding cores and executor memory does not change performance. I am 
> having a lot of trouble identifying the problem by at the metrics 
> provided for streaming apps in the spark application web UI.
>
> I think my performance problem has to with writing the data to S3.
>
> My app receives very complicated JSON. My program is simple, It sorts 
> the data into a small set of sets and writes each set as a separate S3 object.
> The mini batch data has at most 300 events so I do not think shuffle 
> is an issue.
>
> DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();
>
> … Explode tagCol …
>
>
> DataFrame rulesDF = activityDF.select(tagCol).distinct();
>
> Row[] rows = rulesDF.select(tagCol).collect();
>
> List tags = new ArrayList(100);
>
> for (Row row : rows) {
>
> Object tag = row.get(0);
>
> tags.add(tag.toString());
>
> }
>
>
> I think the for loop bellow is where the bottle neck is. Is write async() ?
>
>
> If not is there an easy to to vectorize/parallelize this for loop or 
> do I have to create the threads my self?
>
>
> Is creating threads in spark a bad idea?
>
>
>
> for(String tag : tags) {
>
> DataFrame saveDF = 
> activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>
> if (saveDF.count() >= 1) { // I do not think count() is an issue 
> performance is about 34 ms
>
> String dirPath = “s3n://myBucket" + File.separator + date + 
> File.separator + tag + File.separator +  milliSeconds;
>
> saveDF.write().json(dirPath);
>
> }
>
> }
>
>
> Any suggestions would be greatly appreciated
>
>
> Andy
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: is dataframe.write() async? Streaming performance problem

2016-07-08 Thread Cody Koeninger
Maybe obvious, but what happens when you change the s3 write to a
println of all the data?  That should identify whether it's the issue.

count() and read.json() will involve additional tasks (run through the
items in the rdd to count them, likewise to infer the schema) but for
300 records that shouldn't be much of an issue.

On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson
 wrote:
> I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using kafka
> direct stream approach. I am running into performance problems. My
> processing time is > than my window size. Changing window sizes, adding
> cores and executor memory does not change performance. I am having a lot of
> trouble identifying the problem by at the metrics provided for streaming
> apps in the spark application web UI.
>
> I think my performance problem has to with writing the data to S3.
>
> My app receives very complicated JSON. My program is simple, It sorts the
> data into a small set of sets and writes each set as a separate S3 object.
> The mini batch data has at most 300 events so I do not think shuffle is an
> issue.
>
> DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();
>
> … Explode tagCol …
>
>
> DataFrame rulesDF = activityDF.select(tagCol).distinct();
>
> Row[] rows = rulesDF.select(tagCol).collect();
>
> List tags = new ArrayList(100);
>
> for (Row row : rows) {
>
> Object tag = row.get(0);
>
> tags.add(tag.toString());
>
> }
>
>
> I think the for loop bellow is where the bottle neck is. Is write async() ?
>
>
> If not is there an easy to to vectorize/parallelize this for loop or do I
> have to create the threads my self?
>
>
> Is creating threads in spark a bad idea?
>
>
>
> for(String tag : tags) {
>
> DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>
> if (saveDF.count() >= 1) { // I do not think count() is an issue performance
> is about 34 ms
>
> String dirPath = “s3n://myBucket" + File.separator + date + File.separator +
> tag + File.separator +  milliSeconds;
>
> saveDF.write().json(dirPath);
>
> }
>
> }
>
>
> Any suggestions would be greatly appreciated
>
>
> Andy
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark as sql engine on S3

2016-07-08 Thread Mich Talebzadeh
You can have two approaches here.

Use Hive as it is and replace Hive execution engine with Spark. You can
beeline with Hive thrift server to access your Hive tables.

beeline connects to the thrift server (either Hive or Spark). If you use
spark thrift server with beeline then you are going to take advantage of
Spark SQL.

If you are going to use beeline with Hive thrift server with Hive using
Spark or Tez (well I don't use Tez) then you will use the Hive CBO + Spark.
Hive SQL is a superset of Spark SQL. So you can try either.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 8 July 2016 at 10:49, Ashok Kumar  wrote:

> Hi
>
> As I said we have using Hive as our SQL engine for the datasets but we are
> storing data externally in amazon S3,
>
> Now you suggested Spark thrift server.
>
> Started Spark thrift server on port 10001 and I have used beeline that
> accesses thrift server.
>
> Connecting to jdbc:hive2://,host>:10001
> Connected to: Spark SQL (version 1.6.1)
> Driver: Spark Project Core (version 1.6.1)
> Transaction isolation: TRANSACTION_REPEATABLE_READ
> Beeline version 1.6.1 by Apache Hive
>
> Now I just need to access my external tables on S3 as I do it on Hive with
> beeline connected to Hive thrift server?
>
> The advantage is that using Spark SQL will be much faster?
>
> regards
>
>
>
>
> On Friday, 8 July 2016, 6:30, ayan guha  wrote:
>
>
> Yes, it can.
>
> On Fri, Jul 8, 2016 at 3:03 PM, Ashok Kumar  wrote:
>
> thanks so basically Spark Thrift Server runs on a port much like beeline
> that uses JDBC to connect to Hive?
>
> Can Spark thrift server access Hive tables?
>
> regards
>
>
> On Friday, 8 July 2016, 5:27, ayan guha  wrote:
>
>
> Spark Thrift Server..works as jdbc server. you can connect to it from
> any jdbc tool like squirrel
>
> On Fri, Jul 8, 2016 at 3:50 AM, Ashok Kumar 
> wrote:
>
> Hello gurus,
>
> We are storing data externally on Amazon S3
>
> What is the optimum or best way to use Spark as SQL engine to access data
> on S3?
>
> Any info/write up will be greatly appreciated.
>
> Regards
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


Re: Why so many parquet file part when I store data in Alluxio or File?

2016-07-08 Thread Chanh Le
Hi Gene,
Thank for your support. I agree with you because of number executor but many 
parquet files influence to read performance so I need a way to improve that. So 
the way I work around is 
  df.coalesce(1)
  .write.mode(SaveMode.Overwrite).partitionBy("network_id")
  .parquet(s"$alluxioURL/$outFolderName/time=${dailyFormat.print(jobRunTime)}")
I know this is not good because create a shuffle and cost time but the read 
improve a lot. Right now, I am using that method to partition my data.

Regards,
Chanh


> On Jul 8, 2016, at 8:33 PM, Gene Pang  wrote:
> 
> Hi Chanh,
> 
> You should be able to set the Alluxio block size with:
> 
> sc.hadoopConfiguration.set("alluxio.user.block.size.bytes.default", "256mb")
> 
> I think you have many parquet files because you have many Spark executors 
> writing out their partition of the files.
> 
> Hope that helps,
> Gene
> 
> On Sun, Jul 3, 2016 at 8:02 PM, Chanh Le  > wrote:
> Hi Gene,
> Could you give some suggestions on that?
> 
> 
> 
>> On Jul 1, 2016, at 5:31 PM, Ted Yu > > wrote:
>> 
>> The comment from zhangxiongfei was from a year ago.
>> 
>> Maybe something changed since them ?
>> 
>> On Fri, Jul 1, 2016 at 12:07 AM, Chanh Le > > wrote:
>> Hi Ted,
>> I set sc.hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache", true)
>> sc.hadoopConfiguration.setLong("fs.local.block.size", 268435456)
>> but It seems not working.
>> 
>> 
>> 
>> 
>>> On Jul 1, 2016, at 11:38 AM, Ted Yu >> > wrote:
>>> 
>>> Looking under Alluxio source, it seems only "fs.hdfs.impl.disable.cache" is 
>>> in use.
>>> 
>>> FYI
>>> 
>>> On Thu, Jun 30, 2016 at 9:30 PM, Deepak Sharma >> > wrote:
>>> Ok.
>>> I came across this issue.
>>> Not sure if you already assessed this:
>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-6921 
>>> 
>>> The workaround mentioned may work for you .
>>> 
>>> Thanks
>>> Deepak
>>> 
>>> On 1 Jul 2016 9:34 am, "Chanh Le" >> > wrote:
>>> Hi Deepark,
>>> Thank for replying. The way to write into alluxio is 
>>> df.write.mode(SaveMode.Append).partitionBy("network_id", 
>>> "time").parquet("alluxio://master1:1/FACT_ADMIN_HOURLY <>”)
>>> 
>>> 
>>> I partition by 2 columns and store. I just want when I write it automatic 
>>> write a size properly for what I already set in Alluxio 512MB per block.
>>> 
>>> 
 On Jul 1, 2016, at 11:01 AM, Deepak Sharma > wrote:
 
 Before writing coalesing your rdd to 1 .
 It will create only 1 output file .
 Multiple part file happens as all your executors will be writing their 
 partitions to separate part files.
 
 Thanks
 Deepak
 
 On 1 Jul 2016 8:01 am, "Chanh Le" > wrote:
 Hi everyone,
 I am using Alluxio for storage. But I am little bit confuse why I am do 
 set block size of alluxio is 512MB and my file part only few KB and too 
 many part.
 Is that normal? Because I want to read it fast? Is that many part effect 
 the read operation?
 How to set the size of file part?
 
 Thanks.
 Chanh
 
 
 
  
 
 
>>> 
>>> 
>> 
>> 
> 
> 



Re: Why is KafkaUtils.createRDD offsetRanges an Array rather than a Seq?

2016-07-08 Thread Cody Koeninger
Yeah, it's a reasonable lowest common denominator between java and scala,
and what's passed to that convenience constructor is actually what's used
to construct the class.

FWIW, in the 0.10 direct stream api when there's unavoidable wrapping /
conversion anyway (since the underlying class takes a
java.util.Collection), the Scala convenience methods take an Iterable.

On Fri, Jul 8, 2016 at 5:55 AM, Sean Owen  wrote:

> Java-friendliness is the usual reason, though I don't know if that's the
> reason here.
>
> On Fri, Jul 8, 2016 at 10:42 AM, Mikael Ståldal  > wrote:
>
>> Is there any particular reason for the offsetRanges parameter to
>> KafkaUtils.createRDD is Array[OffsetRange]? Why not Seq[OffsetRange]?
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/kafka/KafkaUtils$.html
>>
>>
>> https://github.com/apache/spark/blob/master/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L248
>>
>> --
>> [image: MagineTV]
>>
>> *Mikael Ståldal*
>> Senior software developer
>>
>> *Magine TV*
>> mikael.stal...@magine.com
>> Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com
>>
>> Privileged and/or Confidential Information may be contained in this
>> message. If you are not the addressee indicated in this message
>> (or responsible for delivery of the message to such a person), you may
>> not copy or deliver this message to anyone. In such case,
>> you should destroy this message and kindly notify the sender by reply
>> email.
>>
>
>


Re: Memory grows exponentially

2016-07-08 Thread Cody Koeninger
Just as an offhand guess, are you doing something like
updateStateByKey without expiring old keys?

On Fri, Jul 8, 2016 at 2:44 AM, Jörn Franke  wrote:
> Memory fragmentation? Quiet common with in-memory systems.
>
>> On 08 Jul 2016, at 08:56, aasish.kumar  wrote:
>>
>> Hello everyone:
>>
>> I have been facing a problem associated spark streaming memory.
>>
>> I have been running two Spark Streaming jobs concurrently. The jobs read
>> data from Kafka with a batch interval of 1 minute, performs aggregation, and
>> sinks the computed data to MongoDB using using stratio-mongodb connector.
>>
>> I have setup the spark standalone cluster on AWS. My setup is configured as
>> follows: I have a four-node cluster. One node as a master, and the rest
>> 3-nodes as workers, while each worker has only one executor, with 2-cores
>> and 8GB of RAM.
>>
>> Currently, I am processing seven-hundred thousand JSON events, every minute.
>> After running the jobs for 3-4 hours, I have observed that the memory
>> consumption keeps growing, exiting one of the jobs.
>>
>> Despite setting /spark.cleaner.ttl/ for 600 seconds, and having used
>> /rdd.unpersist/ method at the end of the job. I am not able to understand
>> why the memory consumption keeps growing over time. I am unable solve this
>> problem. I would appreciate if someone can help me solve or provide
>> redirections as to why this is happening.
>>
>> Thank you.
>>
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-grows-exponentially-tp27308.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Simultaneous spark Jobs execution.

2016-07-08 Thread Mich Talebzadeh
If you are running in Local mode, then you can submit many jobs. As long as
your hardware has resources to do multiple jobs there won't be any
dependency. in other words each app (spark-submit) will run in its own JVM
unaware of others. Local mode is good for testing.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 8 July 2016 at 14:09, Jacek Laskowski  wrote:

> On 8 Jul 2016 2:03 p.m., "Mazen"  wrote:
> >
> > Does Spark handle simulate nous execution of jobs within an application
>
> Yes. Run as many Spark jobs as you want and Spark will queue them given
> CPU and RAM available for you in the cluster.
>
> > job execution is blocking i.e. a new job can not be initiated until the
> > previous one commits.
>
> That's how usually people code their apps. The more advanced approach is
> to use SparkContext from multiple threads and execute actions (that will
> submit jobs).
>
> >  What does it mean that :  "Spark’s scheduler is fully thread-safe"
>
> You can use a single SparkContext from multiple threads.
>
> Jacek
>


RE: Spark with HBase Error - Py4JJavaError

2016-07-08 Thread Puneet Tripathi
Hi Ram, Thanks very much it worked.

Puneet

From: ram kumar [mailto:ramkumarro...@gmail.com]
Sent: Thursday, July 07, 2016 6:51 PM
To: Puneet Tripathi
Cc: user@spark.apache.org
Subject: Re: Spark with HBase Error - Py4JJavaError

Hi Puneet,
Have you tried appending
 --jars $SPARK_HOME/lib/spark-examples-*.jar
to the execution command?
Ram

On Thu, Jul 7, 2016 at 5:19 PM, Puneet Tripathi 
> wrote:
Guys, Please can anyone help on the issue below?

Puneet

From: Puneet Tripathi 
[mailto:puneet.tripa...@dunnhumby.com]
Sent: Thursday, July 07, 2016 12:42 PM
To: user@spark.apache.org
Subject: Spark with HBase Error - Py4JJavaError

Hi,

We are running Hbase in fully distributed mode. I tried to connect to Hbase via 
pyspark and then write to hbase using saveAsNewAPIHadoopDataset , but it failed 
the error says:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset.
: java.lang.ClassNotFoundException: 
org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
I have been able to create pythonconverters.jar and then did below:


1.  I think we have to copy this to a location on HDFS, /sparkjars/ seems a 
good a directory to create as any. I think the file has to be world readable

2.  Set the spark_jar_hdfs_path property in Cloudera Manager e.g. 
hdfs:///sparkjars

It still doesn’t seem to work can someone please help me with this.

Regards,
Puneet
dunnhumby limited is a limited company registered in England and Wales with 
registered number 02388853 and VAT registered number 927 5871 83. Our 
registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The 
contents of this message and any attachments to it are confidential and may be 
legally privileged. If you have received this message in error you should 
delete it from your system immediately and advise the sender. dunnhumby may 
monitor and record all emails. The views expressed in this email are those of 
the sender and not those of dunnhumby.
dunnhumby limited is a limited company registered in England and Wales with 
registered number 02388853 and VAT registered number 927 5871 83. Our 
registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The 
contents of this message and any attachments to it are confidential and may be 
legally privileged. If you have received this message in error you should 
delete it from your system immediately and advise the sender. dunnhumby may 
monitor and record all emails. The views expressed in this email are those of 
the sender and not those of dunnhumby.

dunnhumby limited is a limited company registered in England and Wales with 
registered number 02388853 and VAT registered number 927 5871 83. Our 
registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The 
contents of this message and any attachments to it are confidential and may be 
legally privileged. If you have received this message in error you should 
delete it from your system immediately and advise the sender. dunnhumby may 
monitor and record all emails. The views expressed in this email are those of 
the sender and not those of dunnhumby.


Re: Why so many parquet file part when I store data in Alluxio or File?

2016-07-08 Thread Gene Pang
Hi Chanh,

You should be able to set the Alluxio block size with:

sc.hadoopConfiguration.set("alluxio.user.block.size.bytes.default", "256mb")

I think you have many parquet files because you have many Spark executors
writing out their partition of the files.

Hope that helps,
Gene

On Sun, Jul 3, 2016 at 8:02 PM, Chanh Le  wrote:

> Hi Gene,
> Could you give some suggestions on that?
>
>
>
> On Jul 1, 2016, at 5:31 PM, Ted Yu  wrote:
>
> The comment from zhangxiongfei was from a year ago.
>
> Maybe something changed since them ?
>
> On Fri, Jul 1, 2016 at 12:07 AM, Chanh Le  wrote:
>
>> Hi Ted,
>> I set sc.hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache",
>> true)
>>
>> sc.hadoopConfiguration.setLong("fs.local.block.size", 268435456)
>>
>> but It seems not working.
>>
>> 
>>
>>
>> On Jul 1, 2016, at 11:38 AM, Ted Yu  wrote:
>>
>> Looking under Alluxio source, it seems only "fs.hdfs.impl.disable.cache"
>> is in use.
>>
>> FYI
>>
>> On Thu, Jun 30, 2016 at 9:30 PM, Deepak Sharma 
>> wrote:
>>
>>> Ok.
>>> I came across this issue.
>>> Not sure if you already assessed this:
>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-6921
>>>
>>> The workaround mentioned may work for you .
>>>
>>> Thanks
>>> Deepak
>>> On 1 Jul 2016 9:34 am, "Chanh Le"  wrote:
>>>
 Hi Deepark,
 Thank for replying. The way to write into alluxio is
 df.write.mode(SaveMode.Append).partitionBy("network_id", "time"
 ).parquet("alluxio://master1:1/FACT_ADMIN_HOURLY”)


 I partition by 2 columns and store. I just want when I write
 it automatic write a size properly for what I already set in Alluxio 512MB
 per block.


 On Jul 1, 2016, at 11:01 AM, Deepak Sharma 
 wrote:

 Before writing coalesing your rdd to 1 .
 It will create only 1 output file .
 Multiple part file happens as all your executors will be writing their
 partitions to separate part files.

 Thanks
 Deepak
 On 1 Jul 2016 8:01 am, "Chanh Le"  wrote:

 Hi everyone,
 I am using Alluxio for storage. But I am little bit confuse why I am do
 set block size of alluxio is 512MB and my file part only few KB and too
 many part.
 Is that normal? Because I want to read it fast? Is that many part
 effect the read operation?
 How to set the size of file part?

 Thanks.
 Chanh





 



>>
>>
>
>


Re: Is the operation inside foreachRDD supposed to be blocking?

2016-07-08 Thread Mikael Ståldal
Yes, this is a stupid example.

In my real code the processItem method is using some third-party library
which does things asynchronously and returns a Future.

On Fri, Jul 8, 2016 at 3:11 PM, Sean Owen  wrote:

> You can write this code. I don't think it will do anything useful because
> you're executing asynchronously but then just blocking waiting for
> completion. It seems the same as just doing all the work in processItems()
> directly.
>
> On Fri, Jul 8, 2016 at 1:56 PM, Mikael Ståldal 
> wrote:
>
>> I am not sure I fully understand your answer.
>>
>> Is this code correct?
>>
>> def main() {
>>   KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, 
>> DefaultDecoder](streamingContext, configs, topics).foreachRDD { rdd =>
>> Await.ready(processItems(rdd.collect()), Duration.Inf)
>>   }
>> }
>>
>> def processItems(items: Array[(String, Array[Byte])]): Future[Unit] = {
>>   // start some work which may take a while and immediately return a Future 
>> to keep track of the work
>> }
>>
>>
>> On Fri, Jul 8, 2016 at 12:56 PM, Sean Owen  wrote:
>>
>>> It's no different than any other operation on an RDD. A transformation
>>> doesn't actually do anything by itself, so does not block. An action
>>> triggers computation and blocks until the action completes. You can wait
>>> for it with a Future, sure.
>>>
>>> On Fri, Jul 8, 2016 at 10:43 AM, Mikael Ståldal <
>>> mikael.stal...@magine.com> wrote:
>>>
 In a Spark Streaming job, is the operation inside foreachRDD supposed
 to synchronous / blocking?

 What if you do some asynchronous operation which returns a Future? Are
 you then supposed to do Await on that Future?
 --
 [image: MagineTV]

 *Mikael Ståldal*
 Senior software developer

 *Magine TV*
 mikael.stal...@magine.com
 Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com

 Privileged and/or Confidential Information may be contained in this
 message. If you are not the addressee indicated in this message
 (or responsible for delivery of the message to such a person), you may
 not copy or deliver this message to anyone. In such case,
 you should destroy this message and kindly notify the sender by reply
 email.

>>>
>>>
>>
>>
>> --
>> [image: MagineTV]
>>
>> *Mikael Ståldal*
>> Senior software developer
>>
>> *Magine TV*
>> mikael.stal...@magine.com
>> Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com
>>
>> Privileged and/or Confidential Information may be contained in this
>> message. If you are not the addressee indicated in this message
>> (or responsible for delivery of the message to such a person), you may
>> not copy or deliver this message to anyone. In such case,
>> you should destroy this message and kindly notify the sender by reply
>> email.
>>
>
>


-- 
[image: MagineTV]

*Mikael Ståldal*
Senior software developer

*Magine TV*
mikael.stal...@magine.com
Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com

Privileged and/or Confidential Information may be contained in this
message. If you are not the addressee indicated in this message
(or responsible for delivery of the message to such a person), you may not
copy or deliver this message to anyone. In such case,
you should destroy this message and kindly notify the sender by reply
email.


Re: Is the operation inside foreachRDD supposed to be blocking?

2016-07-08 Thread Sean Owen
You can write this code. I don't think it will do anything useful because
you're executing asynchronously but then just blocking waiting for
completion. It seems the same as just doing all the work in processItems()
directly.

On Fri, Jul 8, 2016 at 1:56 PM, Mikael Ståldal 
wrote:

> I am not sure I fully understand your answer.
>
> Is this code correct?
>
> def main() {
>   KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, 
> DefaultDecoder](streamingContext, configs, topics).foreachRDD { rdd =>
> Await.ready(processItems(rdd.collect()), Duration.Inf)
>   }
> }
>
> def processItems(items: Array[(String, Array[Byte])]): Future[Unit] = {
>   // start some work which may take a while and immediately return a Future 
> to keep track of the work
> }
>
>
> On Fri, Jul 8, 2016 at 12:56 PM, Sean Owen  wrote:
>
>> It's no different than any other operation on an RDD. A transformation
>> doesn't actually do anything by itself, so does not block. An action
>> triggers computation and blocks until the action completes. You can wait
>> for it with a Future, sure.
>>
>> On Fri, Jul 8, 2016 at 10:43 AM, Mikael Ståldal <
>> mikael.stal...@magine.com> wrote:
>>
>>> In a Spark Streaming job, is the operation inside foreachRDD supposed to
>>> synchronous / blocking?
>>>
>>> What if you do some asynchronous operation which returns a Future? Are
>>> you then supposed to do Await on that Future?
>>> --
>>> [image: MagineTV]
>>>
>>> *Mikael Ståldal*
>>> Senior software developer
>>>
>>> *Magine TV*
>>> mikael.stal...@magine.com
>>> Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com
>>>
>>> Privileged and/or Confidential Information may be contained in this
>>> message. If you are not the addressee indicated in this message
>>> (or responsible for delivery of the message to such a person), you may
>>> not copy or deliver this message to anyone. In such case,
>>> you should destroy this message and kindly notify the sender by reply
>>> email.
>>>
>>
>>
>
>
> --
> [image: MagineTV]
>
> *Mikael Ståldal*
> Senior software developer
>
> *Magine TV*
> mikael.stal...@magine.com
> Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com
>
> Privileged and/or Confidential Information may be contained in this
> message. If you are not the addressee indicated in this message
> (or responsible for delivery of the message to such a person), you may not
> copy or deliver this message to anyone. In such case,
> you should destroy this message and kindly notify the sender by reply
> email.
>


Re: Simultaneous spark Jobs execution.

2016-07-08 Thread Jacek Laskowski
On 8 Jul 2016 2:03 p.m., "Mazen"  wrote:
>
> Does Spark handle simulate nous execution of jobs within an application

Yes. Run as many Spark jobs as you want and Spark will queue them given CPU
and RAM available for you in the cluster.

> job execution is blocking i.e. a new job can not be initiated until the
> previous one commits.

That's how usually people code their apps. The more advanced approach is to
use SparkContext from multiple threads and execute actions (that will
submit jobs).

>  What does it mean that :  "Spark’s scheduler is fully thread-safe"

You can use a single SparkContext from multiple threads.

Jacek


spark logging best practices

2016-07-08 Thread vimal dinakaran
Hi,

http://stackoverflow.com/questions/29208844/apache-spark-logging-within-scala

What is the best way to capture spark logs without getting task not
serialzible error ?
The above link has various workarounds.

Also is there a way to dynamically set the log level when the application
is running ? (from warn to debug , without restarting the app)

Thanks
Vimal


Re: Is the operation inside foreachRDD supposed to be blocking?

2016-07-08 Thread Mikael Ståldal
I am not sure I fully understand your answer.

Is this code correct?

def main() {
  KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder,
DefaultDecoder](streamingContext, configs, topics).foreachRDD { rdd =>
Await.ready(processItems(rdd.collect()), Duration.Inf)
  }
}

def processItems(items: Array[(String, Array[Byte])]): Future[Unit] = {
  // start some work which may take a while and immediately return a
Future to keep track of the work
}


On Fri, Jul 8, 2016 at 12:56 PM, Sean Owen  wrote:

> It's no different than any other operation on an RDD. A transformation
> doesn't actually do anything by itself, so does not block. An action
> triggers computation and blocks until the action completes. You can wait
> for it with a Future, sure.
>
> On Fri, Jul 8, 2016 at 10:43 AM, Mikael Ståldal  > wrote:
>
>> In a Spark Streaming job, is the operation inside foreachRDD supposed to
>> synchronous / blocking?
>>
>> What if you do some asynchronous operation which returns a Future? Are
>> you then supposed to do Await on that Future?
>> --
>> [image: MagineTV]
>>
>> *Mikael Ståldal*
>> Senior software developer
>>
>> *Magine TV*
>> mikael.stal...@magine.com
>> Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com
>>
>> Privileged and/or Confidential Information may be contained in this
>> message. If you are not the addressee indicated in this message
>> (or responsible for delivery of the message to such a person), you may
>> not copy or deliver this message to anyone. In such case,
>> you should destroy this message and kindly notify the sender by reply
>> email.
>>
>
>


-- 
[image: MagineTV]

*Mikael Ståldal*
Senior software developer

*Magine TV*
mikael.stal...@magine.com
Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com

Privileged and/or Confidential Information may be contained in this
message. If you are not the addressee indicated in this message
(or responsible for delivery of the message to such a person), you may not
copy or deliver this message to anyone. In such case,
you should destroy this message and kindly notify the sender by reply
email.


?????? Bug about reading parquet files

2016-07-08 Thread Sea
My spark version is 1.6.1.


== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
  +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 
= 6))
 +- Subquery dwd_native
+- 
Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5]
 ParquetRelation: omega.dwd_native


== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
  +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 
= 6))
 +- Subquery dwd_native
+- 
Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5]
 ParquetRelation: omega.dwd_native


== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate
  +- Project
 +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && 
(appid#5 = 6))
+- 
Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5]
 ParquetRelation: omega.dwd_native


== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
output=[count#112L])
+- TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#115L])
   +- Limit 1
  +- ConvertToSafe
 +- TungstenAggregate(key=[], functions=[], output=[])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[], output=[])
  +- Scan ParquetRelation: omega.dwd_native[] InputPaths: 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=0, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=1, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=2, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=3, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=4, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=5, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=6, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=0, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=1, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=2, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=3, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=4, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=5, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=6, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=0, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=1, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=2, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=3, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=4, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=5, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=6, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/appid=0, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/





Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
  at scala.collection.mutable.HashTable$class.resize(HashTable.scala:247)
  at 
scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:151)
  at 
scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:163)
  at 
scala.collection.mutable.LinkedHashSet.findOrAddEntry(LinkedHashSet.scala:41)
  at scala.collection.mutable.LinkedHashSet.add(LinkedHashSet.scala:62)
  at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:59)
  at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:41)
  at scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:26)
  at 

Simultaneous spark Jobs execution.

2016-07-08 Thread Mazen
Does Spark handle simulate nous execution of jobs within an application or
job execution is blocking i.e. a new job can not be initiated until the
previous one commits.


 What does it mean that :  "Spark’s scheduler is fully thread-safe" 


Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Simultaneous-spark-Jobs-execution-tp27310.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RangePartitioning

2016-07-08 Thread tan shai
Hi,

Can any one explain to me the class RangePartitioning "
https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
"

case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
extends Expression with Partitioning with Unevaluable {
override def children: Seq[SortOrder] = ordering
override def nullable: Boolean = false
override def dataType: DataType = IntegerType
override def satisfies(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
case OrderedDistribution(requiredOrdering) =>
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
case ClusteredDistribution(requiredClustering) =>
ordering.map(_.child).forall(x =>
requiredClustering.exists(_.semanticEquals(x)))
case _ => false
}
override def compatibleWith(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this.semanticEquals(o)
case _ => false
}
override def guarantees(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this.semanticEquals(o)
case _ => false
}
}


[no subject]

2016-07-08 Thread tan shai
Hi,

Can any one explain to me the class RangePartitioning "
https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
"

case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
extends Expression with Partitioning with Unevaluable {
override def children: Seq[SortOrder] = ordering
override def nullable: Boolean = false
override def dataType: DataType = IntegerType
override def satisfies(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
case OrderedDistribution(requiredOrdering) =>
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
case ClusteredDistribution(requiredClustering) =>
ordering.map(_.child).forall(x =>
requiredClustering.exists(_.semanticEquals(x)))
case _ => false
}
override def compatibleWith(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this.semanticEquals(o)
case _ => false
}
override def guarantees(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this.semanticEquals(o)
case _ => false
}
}


Re: Is the operation inside foreachRDD supposed to be blocking?

2016-07-08 Thread Sean Owen
It's no different than any other operation on an RDD. A transformation
doesn't actually do anything by itself, so does not block. An action
triggers computation and blocks until the action completes. You can wait
for it with a Future, sure.

On Fri, Jul 8, 2016 at 10:43 AM, Mikael Ståldal 
wrote:

> In a Spark Streaming job, is the operation inside foreachRDD supposed to
> synchronous / blocking?
>
> What if you do some asynchronous operation which returns a Future? Are you
> then supposed to do Await on that Future?
> --
> [image: MagineTV]
>
> *Mikael Ståldal*
> Senior software developer
>
> *Magine TV*
> mikael.stal...@magine.com
> Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com
>
> Privileged and/or Confidential Information may be contained in this
> message. If you are not the addressee indicated in this message
> (or responsible for delivery of the message to such a person), you may not
> copy or deliver this message to anyone. In such case,
> you should destroy this message and kindly notify the sender by reply
> email.
>


Re: Why is KafkaUtils.createRDD offsetRanges an Array rather than a Seq?

2016-07-08 Thread Sean Owen
Java-friendliness is the usual reason, though I don't know if that's the
reason here.

On Fri, Jul 8, 2016 at 10:42 AM, Mikael Ståldal 
wrote:

> Is there any particular reason for the offsetRanges parameter to
> KafkaUtils.createRDD is Array[OffsetRange]? Why not Seq[OffsetRange]?
>
>
> http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/kafka/KafkaUtils$.html
>
>
> https://github.com/apache/spark/blob/master/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L248
>
> --
> [image: MagineTV]
>
> *Mikael Ståldal*
> Senior software developer
>
> *Magine TV*
> mikael.stal...@magine.com
> Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com
>
> Privileged and/or Confidential Information may be contained in this
> message. If you are not the addressee indicated in this message
> (or responsible for delivery of the message to such a person), you may not
> copy or deliver this message to anyone. In such case,
> you should destroy this message and kindly notify the sender by reply
> email.
>


Re: Spark as sql engine on S3

2016-07-08 Thread Ashok Kumar
Hi
As I said we have using Hive asour SQL engine for the datasets but we are 
storing data externally in amazonS3, 
Now you suggested Spark thrift server.

Started Spark thrift server on port 10001 and I have used beeline that accesses 
thrift server. 
Connecting to jdbc:hive2://,host>:10001Connected to: Spark SQL (version 
1.6.1)Driver: Spark Project Core (version 1.6.1)Transaction isolation: 
TRANSACTION_REPEATABLE_READBeeline version 1.6.1 by Apache Hive
Now I just need to access my external tables on S3 as I do it on Hive with 
beeline connected to Hive thrift server?
The advantage is that using Spark SQL will be much faster?
regards

 

On Friday, 8 July 2016, 6:30, ayan guha  wrote:
 

 Yes, it can. 
On Fri, Jul 8, 2016 at 3:03 PM, Ashok Kumar  wrote:

thanks so basically Spark Thrift Server runs on a port much like beeline that 
uses JDBC to connect to Hive?
Can Spark thrift server access Hive tables?
regards 

On Friday, 8 July 2016, 5:27, ayan guha  wrote:
 

 Spark Thrift Server..works as jdbc server. you can connect to it from any 
jdbc tool like squirrel
On Fri, Jul 8, 2016 at 3:50 AM, Ashok Kumar  
wrote:

Hello gurus,
We are storing data externally on Amazon S3
What is the optimum or best way to use Spark as SQL engine to access data on S3?
Any info/write up will be greatly appreciated.
Regards



-- 
Best Regards,
Ayan Guha


   



-- 
Best Regards,
Ayan Guha


  

Is the operation inside foreachRDD supposed to be blocking?

2016-07-08 Thread Mikael Ståldal
In a Spark Streaming job, is the operation inside foreachRDD supposed to
synchronous / blocking?

What if you do some asynchronous operation which returns a Future? Are you
then supposed to do Await on that Future?
-- 
[image: MagineTV]

*Mikael Ståldal*
Senior software developer

*Magine TV*
mikael.stal...@magine.com
Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com

Privileged and/or Confidential Information may be contained in this
message. If you are not the addressee indicated in this message
(or responsible for delivery of the message to such a person), you may not
copy or deliver this message to anyone. In such case,
you should destroy this message and kindly notify the sender by reply
email.


Why is KafkaUtils.createRDD offsetRanges an Array rather than a Seq?

2016-07-08 Thread Mikael Ståldal
Is there any particular reason for the offsetRanges parameter to
KafkaUtils.createRDD is Array[OffsetRange]? Why not Seq[OffsetRange]?

http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/kafka/KafkaUtils$.html

https://github.com/apache/spark/blob/master/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L248

-- 
[image: MagineTV]

*Mikael Ståldal*
Senior software developer

*Magine TV*
mikael.stal...@magine.com
Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com

Privileged and/or Confidential Information may be contained in this
message. If you are not the addressee indicated in this message
(or responsible for delivery of the message to such a person), you may not
copy or deliver this message to anyone. In such case,
you should destroy this message and kindly notify the sender by reply
email.


Re: Is that possible to launch spark streaming application on yarn with only one machine?

2016-07-08 Thread Yu Wei
How could I dump data into text file? Writing to HDFS or other approach?


Thanks,

Jared


From: Rabin Banerjee 
Sent: Thursday, July 7, 2016 7:04:29 PM
To: Yu Wei
Cc: Mich Talebzadeh; user; Deng Ching-Mallete
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?


In that case, I suspect that Mqtt is not getting data while you are submitting  
in yarn cluster .

Can you please try dumping data in text file instead of printing while 
submitting in yarn cluster mode.?

On Jul 7, 2016 12:46 PM, "Yu Wei" 
> wrote:

Yes. Thanks for your clarification.

The problem I encountered is that in yarn cluster mode, no output for 
"DStream.print()" in yarn logs.


In spark implementation org/apache/spark/streaming/dstream/DStream.scala, the 
logs related with "Time" was printed out. However, other information for 
firstNum.take(num).foreach(println) was not printed in logs.

What's the root cause for the behavior difference?


/**
   * Print the first ten elements of each RDD generated in this DStream. This 
is an output
   * operator, so this DStream will be registered as an output stream and there 
materialized.
   */
  def print(): Unit = ssc.withScope {
print(10)
  }

  /**
   * Print the first num elements of each RDD generated in this DStream. This 
is an output
   * operator, so this DStream will be registered as an output stream and there 
materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
  (rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("---")
println("Time: " + time)
println("---")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
  }
}


Thanks,

Jared



From: Rabin Banerjee 
>
Sent: Thursday, July 7, 2016 1:04 PM
To: Yu Wei
Cc: Mich Talebzadeh; Deng Ching-Mallete; 
user@spark.apache.org
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?

In yarn cluster mode , Driver is running in AM , so you can find the logs in 
that AM log . Open rersourcemanager UI , and check for the Job and logs. or 
yarn logs -applicationId 

In yarn client mode , the driver is the same JVM from where you are launching 
,,So you are getting it in the log .

On Thu, Jul 7, 2016 at 7:56 AM, Yu Wei 
> wrote:

Launching via client deploy mode, it works again.

I'm still a little confused about the behavior difference for cluster and 
client mode on a single machine.


Thanks,

Jared


From: Mich Talebzadeh 
>
Sent: Wednesday, July 6, 2016 9:46:11 PM
To: Yu Wei
Cc: Deng Ching-Mallete; user@spark.apache.org

Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?

Deploy-mode cluster don't think will work.

Try --master yarn --deploy-mode client

FYI


  *   Spark Local - Spark runs on the local host. This is the simplest set up 
and best suited for learners who want to understand different concepts of Spark 
and those performing unit testing.

  *   Spark Standalone – a simple cluster manager included with Spark that 
makes it easy to set up a cluster.

  *   YARN Cluster Mode, the Spark driver runs inside an application master 
process which is managed by YARN on the cluster, and the client can go away 
after initiating the application. This is invoked with –master yarn and 
--deploy-mode cluster

  *   YARN Client Mode, the driver runs in the client process, and the 
application master is only used for requesting resources from YARN. Unlike 
Spark standalone mode, in which the master’s address is specified in the 
--master parameter, in YARN mode the ResourceManager’s address is picked up 
from the Hadoop configuration. Thus, the --master parameter is yarn. This is 
invoked with --deploy-mode client

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 6 July 2016 at 12:31, Yu Wei 

How to improve the performance for writing a data frame to a JDBC database?

2016-07-08 Thread Mungeol Heo
Hello,

I am trying to write a data frame to a JDBC database, like SQL server,
using spark 1.6.0.
The problem is "write.jdbc(url, table, connectionProperties)" is too slow.
Is there any way to improve the performance/speed?

e.g. options like partitionColumn, lowerBound, upperBound,
numPartitions which in read.jdbc and read.format("jdbc").

Any help will be great!!!
Thank you

- mungeol

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Graphframe Error

2016-07-08 Thread Felix Cheung
I ran it with Python 2.





On Thu, Jul 7, 2016 at 4:13 AM -0700, "Arun Patel" 
> wrote:

I have tied this already.  It does not work.

What version of Python is needed for this package?

On Wed, Jul 6, 2016 at 12:45 AM, Felix Cheung 
> wrote:
This could be the workaround:

http://stackoverflow.com/a/36419857




On Tue, Jul 5, 2016 at 5:37 AM -0700, "Arun Patel" 
> wrote:

Thanks Yanbo and Felix.

I tried these commands on CDH Quickstart VM and also on "Spark 1.6 pre-built 
for Hadoop" version.  I am still not able to get it working.  Not sure what I 
am missing.  Attaching the logs.




On Mon, Jul 4, 2016 at 5:33 AM, Felix Cheung 
> wrote:
It looks like either the extracted Python code is corrupted or there is a 
mismatch Python version. Are you using Python 3?


stackoverflow.com/questions/514371/whats-the-bad-magic-number-error





On Mon, Jul 4, 2016 at 1:37 AM -0700, "Yanbo Liang" 
> wrote:

Hi Arun,

The command

bin/pyspark --packages graphframes:graphframes:0.1.0-spark1.6

will automatically load the required graphframes jar file from maven 
repository, it was not affected by the location where the jar file was placed. 
Your examples works well in my laptop.

Or you can use try with


bin/pyspark --py-files ***/graphframes.jar --jars ***/graphframes.jar

to launch PySpark with graphframes enabled. You should set "--py-files" and 
"--jars" options with the directory where you saved graphframes.jar.

Thanks
Yanbo


2016-07-03 15:48 GMT-07:00 Arun Patel 
>:
I started my pyspark shell with command  (I am using spark 1.6).

bin/pyspark --packages graphframes:graphframes:0.1.0-spark1.6

I have copied 
http://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.1.0-spark1.6/graphframes-0.1.0-spark1.6.jar
 to the lib directory of Spark as well.

I was getting below error

>>> from graphframes import *
Traceback (most recent call last):
  File "", line 1, in 
zipimport.ZipImportError: can't find module 'graphframes'
>>>

So, as per suggestions from similar questions, I have extracted the graphframes 
python directory and copied to the local directory where I am running pyspark.

>>> from graphframes import *

But, not able to create the GraphFrame

>>> g = GraphFrame(v, e)
Traceback (most recent call last):
  File "", line 1, in 
NameError: name 'GraphFrame' is not defined

Also, I am getting below error.
>>> from graphframes.examples import Graphs
Traceback (most recent call last):
  File "", line 1, in 
ImportError: Bad magic number in graphframes/examples.pyc

Any help will be highly appreciated.

- Arun





Re: Bug about reading parquet files

2016-07-08 Thread Cheng Lian
What's the Spark version? Could you please also attach result of
explain(extended = true)?

On Fri, Jul 8, 2016 at 4:33 PM, Sea <261810...@qq.com> wrote:

> I have a problem reading parquet files.
> sql:
> select count(1) from   omega.dwd_native where year='2016' and month='07'
> and day='05' and hour='12' and appid='6';
> The hive partition is (year,month,day,appid)
>
> only two tasks, and it will list all directories in my table, not only
> /user/omega/events/v4/h/2016/07/07/12/appid=6
> [Stage 1:>  (0 +
> 0) / 2]
>
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/05/31/21/appid=1
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/06/28/20/appid=2
>
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/07/22/21/appid=65537
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/08/14/05/appid=65536
>
>


Bug about reading parquet files

2016-07-08 Thread Sea
I have a problem reading parquet files.
sql:
select count(1) from   omega.dwd_native where year='2016' and month='07' and 
day='05' and hour='12' and appid='6';
The hive partition is (year,month,day,appid)


only two tasks, and it will list all directories in my table, not only 
/user/omega/events/v4/h/2016/07/07/12/appid=6
[Stage 1:>  (0 + 0) / 2]


16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/05/31/21/appid=1 16/07/08 
16:16:51 INFO sources.HadoopFsRelation: Listing 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/06/28/20/appid=2 16/07/08 
16:16:51 INFO sources.HadoopFsRelation: Listing 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/07/22/21/appid=65537 16/07/08 
16:16:51 INFO sources.HadoopFsRelation: Listing 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/08/14/05/appid=65536

Re: Extend Dataframe API

2016-07-08 Thread Rishi Mishra
Or , you can extend SQLContext to add your plans . Not sure if it fits your
requirement , but answered to highlight an option.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Jul 7, 2016 at 8:39 PM, tan shai  wrote:

> That was what I am thinking to do.
>
> Do you have any idea about this? Or any documentations?
>
> Many thanks.
>
> 2016-07-07 17:07 GMT+02:00 Koert Kuipers :
>
>> i dont see any easy way to extend the plans, beyond creating a custom
>> version of spark.
>>
>> On Thu, Jul 7, 2016 at 9:31 AM, tan shai  wrote:
>>
>>> Hi,
>>>
>>> I need to add new operations to the dataframe API.
>>> Can any one explain to me how to extend the plans of query execution?
>>>
>>> Many thanks.
>>>
>>
>>
>


Re: Multiple aggregations over streaming dataframes

2016-07-08 Thread Arnaud Bailly
Thanks for your answers. I know Kafka's model but I would rather like to
avoid having to setup both Spark and Kafka to handle my use case. I wonder
if it might be possible to handle that using Spark's standard streams ?

-- 
Arnaud Bailly

twitter: abailly
skype: arnaud-bailly
linkedin: http://fr.linkedin.com/in/arnaudbailly/

On Fri, Jul 8, 2016 at 12:00 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Kafka has an interesting model that might be applicable.
>
> You can think of kafka as enabling a queue system. Writes are called
> producers, and readers are called consumers. The server is called a broker.
> A “topic” is like a named queue.
>
> Producer are independent. They can write to a “topic” at will. Consumers
> (I.e. You nested aggregates) need to be independent of each other and the
> broker. The broker receives data from produces stores it using memory and
> disk. Consumer read from broker and maintain the cursor. Because the client
> maintains the cursor one consumer can not impact other produces and
> consumers.
>
> I would think the tricky part for spark would to know when the data can be
> deleted. In the Kakfa world each topic is allowed to define a TTL SLA. I.e.
> The consumer must read the data with in a limited of window of time.
>
> Andy
>
> From: Michael Armbrust 
> Date: Thursday, July 7, 2016 at 2:31 PM
> To: Arnaud Bailly 
> Cc: Sivakumaran S , "user @spark" <
> user@spark.apache.org>
> Subject: Re: Multiple aggregations over streaming dataframes
>
> We are planning to address this issue in the future.
>
> At a high level, we'll have to add a delta mode so that updates can be
> communicated from one operator to the next.
>
> On Thu, Jul 7, 2016 at 8:59 AM, Arnaud Bailly 
> wrote:
>
>> Indeed. But nested aggregation does not work with Structured Streaming,
>> that's the point. I would like to know if there is workaround, or what's
>> the plan regarding this feature which seems to me quite useful. If the
>> implementation is not overtly complex and it is just a matter of manpower,
>> I am fine with devoting some time to it.
>>
>>
>>
>> --
>> Arnaud Bailly
>>
>> twitter: abailly
>> skype: arnaud-bailly
>> linkedin: http://fr.linkedin.com/in/arnaudbailly/
>>
>> On Thu, Jul 7, 2016 at 2:17 PM, Sivakumaran S 
>> wrote:
>>
>>> Arnauld,
>>>
>>> You could aggregate the first table and then merge it with the second
>>> table (assuming that they are similarly structured) and then carry out the
>>> second aggregation. Unless the data is very large, I don’t see why you
>>> should persist it to disk. IMO, nested aggregation is more elegant and
>>> readable than a complex single stage.
>>>
>>> Regards,
>>>
>>> Sivakumaran
>>>
>>>
>>>
>>> On 07-Jul-2016, at 1:06 PM, Arnaud Bailly 
>>> wrote:
>>>
>>> It's aggregation at multiple levels in a query: first do some
>>> aggregation on one tavle, then join with another table and do a second
>>> aggregation. I could probably rewrite the query in such a way that it does
>>> aggregation in one pass but that would obfuscate the purpose of the various
>>> stages.
>>> Le 7 juil. 2016 12:55, "Sivakumaran S"  a écrit :
>>>
 Hi Arnauld,

 Sorry for the doubt, but what exactly is multiple aggregation? What is
 the use case?

 Regards,

 Sivakumaran


 On 07-Jul-2016, at 11:18 AM, Arnaud Bailly 
 wrote:

 Hello,

 I understand multiple aggregations over streaming dataframes is not
 currently supported in Spark 2.0. Is there a workaround? Out of the top of
 my head I could think of having a two stage approach:
  - first query writes output to disk/memory using "complete" mode
  - second query reads from this output

 Does this makes sense?

 Furthermore, I would like to understand what are the technical hurdles
 that are preventing Spark SQL from implementing multiple aggregation right
 now?

 Thanks,
 --
 Arnaud Bailly

 twitter: abailly
 skype: arnaud-bailly
 linkedin: http://fr.linkedin.com/in/arnaudbailly/



>>>
>>
>


Re: Processing json document

2016-07-08 Thread Jörn Franke
You are correct, although I think there is a way to proper identify the json 
even in case it is splitted ( i think this should be supported by the json 
parser). 
Nevertheless - as exchange format in Big Data platforms you should use Avro and 
for tabular analytics ORC or Parquet...
Nevertheless, there are still formats for other structures are missing (eg 
graph structures, tree structures etc)

> On 07 Jul 2016, at 19:06, Yong Zhang  wrote:
> 
> The problem is for Hadoop Input format to identify the record delimiter. If 
> the whole json record is in one line, then the nature record delimiter will 
> be the new line character. 
> 
> Keep in mind in distribute file system, the file split position most likely 
> IS not on the record delimiter. The input format implementation has to go 
> back or forward in the bytes array looking for the next record delimiter on 
> another node. 
> 
> Without a perfect record delimiter, then you just has to parse the whole 
> file, as you know the file boundary is a reliable record delimiter.
> 
> JSON is Never a good format to be stored in BigData platform. If your source 
> json is liking this, then you have to preprocess it. Or write your own 
> implementation to handle the record delimiter, for your json data case. But 
> good luck with that. There is no perfect generic solution for any kind of 
> JSON data you want to handle.
> 
> Yong
> 
> From: ljia...@gmail.com
> Date: Thu, 7 Jul 2016 11:57:26 -0500
> Subject: Re: Processing json document
> To: gurwls...@gmail.com
> CC: jornfra...@gmail.com; user@spark.apache.org
> 
> Hi, there,
> 
> Thank you all for your input. @Hyukjin, as a matter of fact, I have read the 
> blog link you posted before asking the question on the forum. As you pointed 
> out, the link uses wholeTextFiles(0, which is bad in my case, because my json 
> file can be as large as 20G+ and OOM might occur. I am not sure how to 
> extract the value by using textFile call as it will create an RDD of string 
> and treat each line without ordering. It destroys the json context. 
> 
> Large multiline json file with parent node are very common in the real world. 
> Take the common employees json example below, assuming we have millions of 
> employee and it is super large json document, how can spark handle this? This 
> should be a common pattern, shouldn't it? In real world, json document does 
> not always come as cleanly formatted as the spark example requires. 
> 
> {
> "employees":[
> {
>   "firstName":"John", 
>   "lastName":"Doe"
> },
> {
>   "firstName":"Anna", 
>"lastName":"Smith"
> },
> {
>"firstName":"Peter", 
> "lastName":"Jones"}
> ]
> }
> 
> 
> 
> On Thu, Jul 7, 2016 at 1:47 AM, Hyukjin Kwon  wrote:
> The link uses wholeTextFiles() API which treats each file as each record.
> 
> 
> 2016-07-07 15:42 GMT+09:00 Jörn Franke :
> This does not need necessarily the case if you look at the Hadoop 
> FileInputFormat architecture then you can even split large multi line Jsons 
> without issues. I would need to have a look at it, but one large file does 
> not mean one Executor independent of the underlying format.
> 
> On 07 Jul 2016, at 08:12, Hyukjin Kwon  wrote:
> 
> There is a good link for this here, 
> http://searchdatascience.com/spark-adventures-1-processing-multi-line-json-files
> 
> If there are a lot of small files, then it would work pretty okay in a 
> distributed manner, but I am worried if it is single large file.
> 
> In this case, this would only work in single executor which I think will end 
> up with OutOfMemoryException.
> 
> Spark JSON data source does not support multi-line JSON as input due to the 
> limitation of TextInputFormat and LineRecordReader.
> 
> You may have to just extract the values after reading it by textFile..
> 
> 
> 
> 2016-07-07 14:48 GMT+09:00 Lan Jiang :
> Hi, there
> 
> Spark has provided json document processing feature for a long time. In most 
> examples I see, each line is a json object in the sample file. That is the 
> easiest case. But how can we process a json document, which does not conform 
> to this standard format (one line per json object)? Here is the document I am 
> working on. 
> 
> First of all, it is multiple lines for one single big json object. The real 
> file can be as long as 20+ G. Within that one single json object, it contains 
> many name/value pairs. The name is some kind of id values. The value is the 
> actual json object that I would like to be part of dataframe. Is there any 
> way to do that? Appreciate any input. 
> 
> 
> {
> "id1": {
> "Title":"title1",
> "Author":"Tom",
> "Source":{
> "Date":"20160506",
> "Type":"URL"
> },
> "Data":" blah blah"},
> 
> "id2": {
> "Title":"title2",
> "Author":"John",
> "Source":{
> "Date":"20150923",
> 

Re: Memory grows exponentially

2016-07-08 Thread Jörn Franke
Memory fragmentation? Quiet common with in-memory systems.

> On 08 Jul 2016, at 08:56, aasish.kumar  wrote:
> 
> Hello everyone:
> 
> I have been facing a problem associated spark streaming memory.
> 
> I have been running two Spark Streaming jobs concurrently. The jobs read
> data from Kafka with a batch interval of 1 minute, performs aggregation, and
> sinks the computed data to MongoDB using using stratio-mongodb connector.
> 
> I have setup the spark standalone cluster on AWS. My setup is configured as
> follows: I have a four-node cluster. One node as a master, and the rest
> 3-nodes as workers, while each worker has only one executor, with 2-cores
> and 8GB of RAM.
> 
> Currently, I am processing seven-hundred thousand JSON events, every minute.
> After running the jobs for 3-4 hours, I have observed that the memory
> consumption keeps growing, exiting one of the jobs.
> 
> Despite setting /spark.cleaner.ttl/ for 600 seconds, and having used
> /rdd.unpersist/ method at the end of the job. I am not able to understand
> why the memory consumption keeps growing over time. I am unable solve this
> problem. I would appreciate if someone can help me solve or provide
> redirections as to why this is happening.
> 
> Thank you.
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-grows-exponentially-tp27308.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Memory grows exponentially

2016-07-08 Thread aasish.kumar
Hello everyone:

I have been facing a problem associated spark streaming memory.

I have been running two Spark Streaming jobs concurrently. The jobs read
data from Kafka with a batch interval of 1 minute, performs aggregation, and
sinks the computed data to MongoDB using using stratio-mongodb connector.

I have setup the spark standalone cluster on AWS. My setup is configured as
follows: I have a four-node cluster. One node as a master, and the rest
3-nodes as workers, while each worker has only one executor, with 2-cores
and 8GB of RAM.

Currently, I am processing seven-hundred thousand JSON events, every minute.
After running the jobs for 3-4 hours, I have observed that the memory
consumption keeps growing, exiting one of the jobs.

Despite setting /spark.cleaner.ttl/ for 600 seconds, and having used
/rdd.unpersist/ method at the end of the job. I am not able to understand
why the memory consumption keeps growing over time. I am unable solve this
problem. I would appreciate if someone can help me solve or provide
redirections as to why this is happening.

Thank you.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-grows-exponentially-tp27308.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



回复:Re: Re: how to select first 50 value of each group after group by?

2016-07-08 Thread luohui20001

Thank you Anton I got my problem solved as below codeval hivetable 
= hc.sql("select * from house_sale_pv_location")
val overLocation = Window.partitionBy(hivetable.col("lp_location_id"))
val sortedDF = hivetable.withColumn("rowNumber", 
row_number().over(overLocation)).filter("rowNumber<=50")
sortedDF.write.saveAsTable("house_id_pv_location_top50")Thank you 
guys.



 

ThanksBest regards!
San.Luo

- 原始邮件 -
发件人:Anton Okolnychyi 
收件人:罗辉 , user 
主题:Re: Re: how to select first 50 value of each group after group by?
日期:2016年07月07日 20点38分

Hi,
I can try to guess what is wrong, but I might be incorrect.
You should be careful with window frames (you define them via the rowsBetween() 
method). 
In my understanding, all window functions can be divided into 2 groups: - 
functions defined by the 
org.apache.spark.sql.catalyst.expressions.WindowFunction trait ("true" window 
functions)- all other supported functions that are marked as window functions 
by providing a window specification. 
The main distinction is that functions from the first group might have a 
predefined internal frame. That's exactly your case.Both row_number() and 
rank() functions are from the first group (i.e. they have predefined internal 
frames).To make your case work, you have 2 options:- remove your own frame 
specification(i.e. rowsBetween(0, 49)) and use only 
Window.partitionBy(hivetable.col("location"))- state explictly correct window 
frames. For instance, rowsBetween(Long.MinValue, 0) for rank() and 
row_number(). 
By the way, there is not too much documentation how Spark resolves window 
frames. For that reason, I created a small pull request that can 
help:https://github.com/apache/spark/pull/14050It would be nice if anyone 
experienced can take a look at it since it is based only on my own analysis.
2016-07-07 13:26 GMT+02:00  :
hi Anton:  I check the docs you mentioned, and have code accordingly, 
however met an exception like "org.apache.spark.sql.AnalysisException: Window 
function row_number does not take a frame specification.;"  It Seems that 
the row_number API is giving a global row numbers of every row across all 
frames, by my understanding. If wrong,please correct me.  I checked all the 
window function API of 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$,
 and just found that maybe row_number() seems matches. I am not quit sure about 
it.
here is my code:val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val hivetable = hc.sql("select * from house_sale_pv_location")
val overLocation = 
Window.partitionBy(hivetable.col("location")).rowsBetween(0, 49)
val sortedDF = hivetable.withColumn("rowNumber", 
row_number().over(overLocation))
sortedDF.registerTempTable("sortedDF")
val top50 = hc.sql("select id,location from sortedDF where rowNumber<=50")
top50.registerTempTable("top50")
hc.sql("select * from top50 where location=30").collect.foreach(println)

here, hivetable is a DF that I mentioned with 3 columns  "id , pv, location", 
which is already sorted by pv in desc. So I didn't call orderby in the 3rd line 
of my code. I just want the first 50 rows, based on  physical location, of each 
frame.
To Tal:  I tried rank API, however this is not the API I want , because 
there are some values have same pv are ranked as same values. And first 50 rows 
of each frame is what I'm expecting. the attached file shows what I got by 
using rank.   Thank you anyway, I learnt what rank could provide from your 
advice.



 

ThanksBest regards!
San.Luo

- 原始邮件 -
发件人:Anton Okolnychyi 
收件人:user 
抄送人:luohui20...@sina.com
主题:Re: how to select first 50 value of each group after group by?
日期:2016年07月06日 23点22分

The following resources should be useful:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html

The last link should have the exact solution
2016-07-06 16:55 GMT+02:00 Tal Grynbaum :
You can use rank window function to rank each row in the group,  and then 
filter the rowz with rank < 50

On Wed, Jul 6, 2016, 14:07   wrote:
hi thereI have a DF with 3 columns: id , pv, location.(the rows are already 
grouped by location and sort by pv in des)  I wanna get the first 50 id values 
grouped by location. I checked the API of dataframe,groupeddata,pairRDD, and 
found no match.  is there a way to do this naturally?  any info will be 
appreciated.




 

ThanksBest regards!
San.Luo







-

To unsubscribe e-mail: