Re: Is there any tool provides per-task monitoring to figure out task skew in Spark streaming?

2015-09-28 Thread Tathagata Das
You can see the task details in the Spark UI to see how many bytes do each
of the tasks in the skewed stages process. That would be good place to
start.

On Mon, Sep 28, 2015 at 7:59 PM, 이기석  wrote:

> Hi this is a graduate student studying Spark streaming for research
> purpose.
>
> I want to know whether there is a task skew in my streaming application.
> But as far as I found out, the Spark UI does not provide any useful
> information to figure this.
>
> I found a related work from Spark Summit 2014:
>
> *Sparkling: Identification of Task Skew and Speculative Partition of Data
> for Spark applications*
> (
> https://spark-summit.org/2014/talk/sparkling-identification-of-task-skew-and-speculative-partition-of-data-for-spark-applications
> )
>
> However it does not seem to be opened for public use.
> Is there any useful tool that I can use to find a task skew?
>
>
> Thanks in advance.
>


A non-canonical use of the Spark computation model

2015-09-28 Thread Blarvomere
The closest use case to what I will describe, I believe, is the real-time ad
serving that Yahoo is doing.

I am looking into using Spark as a sub-second latency decision engine
service that a user-facing application calls, maybe via the Livy REST server
or spark-jobserver. Instead of Terabytes of Data to sift through, it only
need be a few GB or less. my question is, where on the spectrum of "an
interesting use of Spark and there are some examples of projects that do
this type of thing" to "thats so far outside of what Spark was designed for
that its probably not the direction to go" is this idea?

here is a silly example to illustrate what I mean, sorry if long-winded but
just want to be clear on the type of iterative algorithm that Spark seems to
be well suited for:

rich song data objects (30 fields per song) exposed as an RDD, 100,000 to
1Million songs. we at Songiato think we can make the best 10 selections to
put on a users playlist based on a users real time context, i.e. mood, play
history, heart-rate(so cannot be pre-computed), how many cats he/she owns,
etc. 

the call to the Spark decision engine is made and includes those context
variables, and Songiato's secret algorithms are a series of mapping steps to
compute a score for each song, followed by a single fold on top score to
choose the first song. this updates history and thus context, so 9 more
iterations happen to complete the 10 song selection. 

thank you ahead of time for any help. Ive been pulling my hair out trying to
decide if Spark is the right tool for the job! 







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/A-non-canonical-use-of-the-Spark-computation-model-tp24855.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext._active_spark_context returns None

2015-09-28 Thread YiZhi Liu
Hi Ted,

Thank you for reply. The sc works at driver, but how can I reach the
JVM in rdd.map ?

2015-09-29 11:26 GMT+08:00 Ted Yu :
 sc._jvm.java.lang.Integer.valueOf("12")
> 12
>
> FYI
>
> On Mon, Sep 28, 2015 at 8:08 PM, YiZhi Liu  wrote:
>>
>> Hi,
>>
>> I'm doing some data processing on pyspark, but I failed to reach JVM
>> in workers. Here is what I did:
>>
>> $ bin/pyspark
>> >>> data = sc.parallelize(["123", "234"])
>> >>> numbers = data.map(lambda s:
>> >>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf(s.strip()))
>> >>> numbers.collect()
>>
>> I got,
>>
>> Caused by: org.apache.spark.api.python.PythonException: Traceback
>> (most recent call last):
>>   File
>> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line 111, in main
>> process()
>>   File
>> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line 106, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>   File
>> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 263, in dump_stream
>> vs = list(itertools.islice(iterator, batch))
>>   File "", line 1, in 
>> AttributeError: 'NoneType' object has no attribute '_jvm'
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
>> at
>> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> ... 1 more
>>
>> While _jvm at the driver end looks fine:
>>
>> >>>
>> >>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf("123".strip())
>> 123
>>
>> The program is trivial, I just wonder what is the right way to reach
>> JVM in python. Any help would be appreciated.
>>
>> Thanks
>>
>> --
>> Yizhi Liu
>> Senior Software Engineer / Data Mining
>> www.mvad.com, Shanghai, China
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>



-- 
Yizhi Liu
Senior Software Engineer / Data Mining
www.mvad.com, Shanghai, China

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is MLBase dead?

2015-09-28 Thread Justin Pihony
To take a stab at my own answer: MLBase is now fully integrated into MLLib.
MLI/MLLib are the mllib algorithms and MLO is the ml pipelines?

On Mon, Sep 28, 2015 at 10:19 PM, Justin Pihony 
wrote:

> As in, is MLBase (MLO/MLI/MLlib) now simply org.apache.spark.mllib and
> org.apache.spark.ml? I cannot find anything official, and the last updates
> seem to be a year or two old.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-MLBase-dead-tp24854.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark-submit classloader issue...

2015-09-28 Thread Aniket Bhatnagar
Hi Rachna

Can you just use http client provided via spark transitive dependencies
instead of excluding them?

The reason user classpath first is failing could be because you have spark
artifacts in your assembly jar that dont match with what is deployed
(version mismatch or you built the version yourself, etc)

Thanks,
Aniket

On Tue, Sep 29, 2015, 7:31 AM Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> Hello all,
>
> *Goal:*  I want to use APIs from HttpClient library 4.4.1.  I am using
> maven shaded plugin to generate JAR.
>
>
>
> *Findings:* When I run my program as a *java application within eclipse
> everything works fine*.  But when I am running the program using
> *spark-submit* I am getting following error:
>
> URL content Could not initialize class
> org.apache.http.conn.ssl.SSLConnectionSocketFactory
>
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.http.conn.ssl.SSLConnectionSocketFactory
>
>
>
> When I tried to get the referred JAR it is pointing to some Hadoop JAR,  I
> am assuming this is something set in spark-submit.
>
>
>
> ClassLoader classLoader = HttpEndPointClient.class.getClassLoader();
>
> URL resource =
> classLoader.getResource("org/apache/http/message/BasicLineFormatter.class");
>
> Prints following jar:
>
>
> jar:file:/usr/lib/hadoop/lib/httpcore-4.2.5.jar!/org/apache/http/message/BasicLineFormatter.class
>
>
>
> After research I found that I can override *--conf
> spark.files.userClassPathFirst=true --conf
> spark.yarn.user.classpath.first=true*
>
>
>
> But when I do that I am getting following error:
>
> ERROR: org.apache.spark.executor.Executor - Exception in task 0.0 in stage
> 0.0 (TID 0)
>
> java.io.InvalidClassException: org.apache.spark.scheduler.Task; local
> class incompatible: stream classdesc serialVersionUID =
> -4703555755588060120, local class serialVersionUID = -1589734467697262504
>
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
>
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
>
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
>
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>
> at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
>
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> I am running on CDH 5.4  Here is my complete pom file.
>
>
>
> http://maven.apache.org/POM/4.0.0; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance;
>
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd
> 
> ">
>
> 4.0.0
>
> test
>
> test
>
> 0.0.1-SNAPSHOT
>
> 
>
> 
>
>
> org.apache.httpcomponents
>
>
> httpcore
>
> 4.4.1
>
> 
>
> 
>
>
> org.apache.httpcomponents
>
>
> httpclient
>
> 4.4.1
>
> 
>
> 
>
>
> org.apache.spark
>
>
> spark-streaming-kafka_2.10
>
> 1.5.0
>
> 
>
> 
>
>  httpcore
>
>   org.apache.httpcomponents
>
> 
>
> 
>
> 
>
> 
>
>
> org.apache.spark
>
>
> spark-streaming_2.10
>
> 1.5.0
>
> 
>
> 
>
>  httpcore
>
>   org.apache.httpcomponents
>
> 
>
> 
>
> 
>
> 
>
>
> org.apache.spark
>
>
> spark-core_2.10
>
> 1.5.0
>
> 
>
> 
>
>  httpcore
>
>   

Re: flatmap() and spark performance

2015-09-28 Thread Hemant Bhanawat
You can use spark.executor.memory to specify the memory of the executors
which will  hold this intermediate results.

You may want to look at the section "Understanding Memory Management in
Spark" of this link:

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html


On Tue, Sep 29, 2015 at 10:51 AM, jeff saremi 
wrote:

> Is there anyway to let spark know ahead of time what size of RDD to expect
> as a result of a flatmap() operation?
> And would that help in terms of performance?
> For instance, if I have an RDD of 1million rows and I know that my
> flatMap() will produce 100million rows, is there a way to indicate that to
> Spark? to say "reserve" space for the resulting RDD?
>
> thanks
> Jeff
>


RE: nested collection object query

2015-09-28 Thread Tridib Samanta
Thanks for you response Yong! Array syntax works fine. But I am not sure how to 
use explode. Should I use as follows?
select id from department where explode(employee).name = 'employee0
 
This query gives me java.lang.UnsupportedOperationException . I am using 
HiveContext.
 
From: java8...@hotmail.com
To: tridib.sama...@live.com; user@spark.apache.org
Subject: RE: nested collection object query
Date: Mon, 28 Sep 2015 20:42:11 -0400




Your employee in fact is an array of struct, not just struct.
If you are using HiveSQLContext, then you can refer it like following:
select id from member where employee[0].name = 'employee0'
The employee[0] is pointing to the 1st element of the array. 
If you want to query all the elements in the array, then you have to use 
"explode" in the Hive. 
See Hive document for this:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-explode
Yong

> Date: Mon, 28 Sep 2015 16:37:23 -0700
> From: tridib.sama...@live.com
> To: user@spark.apache.org
> Subject: nested collection object query
> 
> Hi Friends,
> What is the right syntax to query on collection of nested object? I have a
> following schema and SQL. But it does not return anything. Is the syntax
> correct?
> 
> root
>  |-- id: string (nullable = false)
>  |-- employee: array (nullable = false)
>  ||-- element: struct (containsNull = true)
>  |||-- id: string (nullable = false)
>  |||-- name: string (nullable = false)
>  |||-- speciality: string (nullable = false)
> 
> 
> select id from member where employee.name = 'employee0'
> 
> Uploaded a test if some one want to try it out. NestedObjectTest.java
> 
>   
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/nested-collection-object-query-tp24853.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

  

Re: SparkContext._active_spark_context returns None

2015-09-28 Thread Ted Yu
>>> sc._jvm.java.lang.Integer.valueOf("12")
12

FYI

On Mon, Sep 28, 2015 at 8:08 PM, YiZhi Liu  wrote:

> Hi,
>
> I'm doing some data processing on pyspark, but I failed to reach JVM
> in workers. Here is what I did:
>
> $ bin/pyspark
> >>> data = sc.parallelize(["123", "234"])
> >>> numbers = data.map(lambda s:
> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf(s.strip()))
> >>> numbers.collect()
>
> I got,
>
> Caused by: org.apache.spark.api.python.PythonException: Traceback
> (most recent call last):
>   File
> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py",
> line 111, in main
> process()
>   File
> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py",
> line 106, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File
> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/serializers.py",
> line 263, in dump_stream
> vs = list(itertools.islice(iterator, batch))
>   File "", line 1, in 
> AttributeError: 'NoneType' object has no attribute '_jvm'
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
> at
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
>
> While _jvm at the driver end looks fine:
>
> >>>
> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf("123".strip())
> 123
>
> The program is trivial, I just wonder what is the right way to reach
> JVM in python. Any help would be appreciated.
>
> Thanks
>
> --
> Yizhi Liu
> Senior Software Engineer / Data Mining
> www.mvad.com, Shanghai, China
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark SQL: Implementing Custom Data Source

2015-09-28 Thread Ted Yu
See this thread:

http://search-hadoop.com/m/q3RTttmiYDqGc202

And:

http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources

> On Sep 28, 2015, at 8:22 PM, Jerry Lam  wrote:
> 
> Hi spark users and developers,
> 
> I'm trying to learn how implement a custom data source for Spark SQL. Is 
> there a documentation that I can use as a reference? I'm not sure exactly 
> what needs to be extended/implemented. A general workflow will be greatly 
> helpful!
> 
> Best Regards,
> 
> Jerry


Re: Setting executors per worker - Standalone

2015-09-28 Thread Jeff Zhang
use "--executor-cores 1" you will get 4 executors per worker since you have
4 cores per worker



On Tue, Sep 29, 2015 at 8:24 AM, James Pirz  wrote:

> Hi,
>
> I am using speak 1.5 (standalone mode) on a cluster with 10 nodes while
> each machine has 12GB of RAM and 4 cores. On each machine I have one worker
> which is running one executor that grabs all 4 cores. I am interested to
> check the performance with "one worker but 4 executors per machine - each
> with one core".
>
> I can see that "running multiple executors per worker in Standalone mode"
> is possible based on the closed issue:
>
> https://issues.apache.org/jira/browse/SPARK-1706
>
> But I can not find a way to do that. "SPARK_EXECUTOR_INSTANCES" is only
> available for the Yarn mode, and in the standalone mode I can just set
> "SPARK_WORKER_INSTANCES" and "SPARK_WORKER_CORES" and "SPARK_WORKER_MEMORY".
>
> Any hint or suggestion would be great.
>
>


-- 
Best Regards

Jeff Zhang


Re: Setting executors per worker - Standalone

2015-09-28 Thread James Pirz
Thanks for your reply.

Setting it as

--conf spark.executor.cores=1

when I start spark-shell (as an example application) indeed sets the number
of cores per executor as 1 (which is 4 before), but I still have 1 executor
per worker. What I am really looking for is having 1 worker with 4 executor
(each with one core) per machine when I run my application. Based one the
documentation it seems it is feasible, but it is not clear as how.

Thanks.

On Mon, Sep 28, 2015 at 8:46 PM, Jeff Zhang  wrote:

> use "--executor-cores 1" you will get 4 executors per worker since you
> have 4 cores per worker
>
>
>
> On Tue, Sep 29, 2015 at 8:24 AM, James Pirz  wrote:
>
>> Hi,
>>
>> I am using speak 1.5 (standalone mode) on a cluster with 10 nodes while
>> each machine has 12GB of RAM and 4 cores. On each machine I have one worker
>> which is running one executor that grabs all 4 cores. I am interested to
>> check the performance with "one worker but 4 executors per machine - each
>> with one core".
>>
>> I can see that "running multiple executors per worker in Standalone mode"
>> is possible based on the closed issue:
>>
>> https://issues.apache.org/jira/browse/SPARK-1706
>>
>> But I can not find a way to do that. "SPARK_EXECUTOR_INSTANCES" is only
>> available for the Yarn mode, and in the standalone mode I can just set
>> "SPARK_WORKER_INSTANCES" and "SPARK_WORKER_CORES" and "SPARK_WORKER_MEMORY".
>>
>> Any hint or suggestion would be great.
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Fwd: "Method json([class java.util.HashMap]) does not exist" when reading JSON on PySpark

2015-09-28 Thread Fernando Paladini
Hello guys,

I'm very new to Spark and I'm having some troubles when reading a JSON to
dataframe on PySpark.

I'm getting a JSON object from an API response and I would like to store it
in Spark as a DataFrame (I've read that DataFrame is better than RDD,
that's accurate?). For what I've read

on documentation, I just need to call the method sqlContext.read.json in
order to do what I want.

*Following is the code from my test application:*
json_object = json.loads(response.text)
sc = SparkContext("local", appName="JSON to RDD")
sqlContext = SQLContext(sc)
dataframe = sqlContext.read.json(json_object)
dataframe.show()

*The problem is that when I run **"spark-submit myExample.py" I got the
following error:*
15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
localhost, 48634)
15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
Traceback (most recent call last):
  File "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py", line
35, in 
dataframe = sqlContext.read.json(json_object)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line
144, in json
  File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36,
in deco
  File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o21.json. Trace:
py4j.Py4JException: Method json([class java.util.HashMap]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

*What I'm doing wrong? *
Check out this gist 
to see the JSON I'm trying to load.

Thanks!
Fernando Paladini


Re: GroupBy Java objects in Java Spark

2015-09-28 Thread Peter Bollerman
Hi,

You will want to make sure your key JavaObject implements equals() and
hashCode() appropriately. Otherwise you may not get the groupings you expect

Peter Bollerman
Principal Software Engineer
The Allant Group, Inc.
630-718-3830

On Thu, Sep 24, 2015 at 5:27 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> By java class objects if you mean your custom Java objects, yes of course.
> That will work.
>
> Regards
> Sab
> On 24-Sep-2015 3:36 pm, "Ramkumar V"  wrote:
>
>> Hi,
>>
>> I want to know whether grouping by java class objects is possible or not
>> in java Spark.
>>
>> I have Tuple2< JavaObject, JavaObject>. i want to groupbyKey and then
>> i'll do some operations in values after grouping.
>>
>>
>> *Thanks*,
>> 
>>
>>


Spark streaming job filling a lot of data in local spark nodes

2015-09-28 Thread swetha

Hi,

I see a lot of data getting filled locally as shown below from my streaming
job. I have my checkpoint set to hdfs. But, I still see the following data
filling my local nodes. Any idea if I can make this stored in hdfs instead
of storing the data locally?

-rw-r--r--  1520 Sep 17 18:43 shuffle_23119_5_0.index
-rw-r--r--  1 180564255 Sep 17 18:43 shuffle_23129_2_0.data
-rw-r--r--  1 364850277 Sep 17 18:45 shuffle_23145_8_0.data
-rw-r--r--  1  267583750 Sep 17 18:46 shuffle_23105_4_0.data
-rw-r--r--  1  136178819 Sep 17 18:48 shuffle_23123_8_0.data
-rw-r--r--  1  159931184 Sep 17 18:48 shuffle_23167_8_0.data
-rw-r--r--  1520 Sep 17 18:49 shuffle_23315_7_0.index
-rw-r--r--  1520 Sep 17 18:50 shuffle_23319_3_0.index
-rw-r--r--  1   92240350 Sep 17 18:51 shuffle_23305_2_0.data
-rw-r--r--  1   40380158 Sep 17 18:51 shuffle_23323_6_0.data
-rw-r--r--  1  369653284 Sep 17 18:52 shuffle_23103_6_0.data
-rw-r--r--  1  371932812 Sep 17 18:52 shuffle_23125_6_0.data
-rw-r--r--  1   19857974 Sep 17 18:53 shuffle_23291_19_0.data
-rw-r--r--  1  55342005 Sep 17 18:53 shuffle_23305_8_0.data
-rw-r--r--  1   92920590 Sep 17 18:53 shuffle_23303_4_0.data


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-job-filling-a-lot-of-data-in-local-spark-nodes-tp24846.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Using Map and Basic Operators yield java.lang.ClassCastException (Parquet + Hive + Spark SQL 1.5.0 + Thrift)

2015-09-28 Thread Dominic Ricard
Thanks Cheng. We use the Thrift Server to connect to Spark SQL from a JDBC 
client. 

I finally found the solution. My issue was coming from my query, as I reference 
the column as a STRUCT instead of a MAP. Here's the fix:

Original Query with issue:
SELECT COUNT(*) FROM table WHERE queue_access_count.`a2x-spot`.accessed * 3 
/ duration < 1

Fixed Query:
SELECT COUNT(*) FROM table WHERE queue_access_count['a2x-spot'].accessed * 
3 / duration < 1

For reference, your question helped me find the fix:

Schema definition for column "queue_access_count":

| queue_access_count |map> |

Parquet Schema for map column:
optional group queue_access_count (MAP) {
repeated group map (MAP_KEY_VALUE) {
  required binary key (UTF8);
  optional group value {
optional int64 accessed;
optional int64 consumed;
  }
}
  }

Thanks!

Dominic Ricard
Triton Digital

-Original Message-
From: Cheng Lian [mailto:lian.cs@gmail.com] 
Sent: Friday, September 25, 2015 1:41 PM
To: Dominic Ricard; user@spark.apache.org
Subject: Re: Using Map and Basic Operators yield java.lang.ClassCastException 
(Parquet + Hive + Spark SQL 1.5.0 + Thrift)

Thanks for the clarification. Could you please provide the full schema of your 
table and query plans of your query? You may obtain them via:

hiveContext.table("your_table").printSchema()

and

hiveContext.sql("your query").explain(extended = true)

You also mentioned "Thrift" in the subject, did you mean the Thrift server? Or 
maybe the Parquet files were written by parquet-thrift? Could you please also 
provide the full Parquet schema of the Parquet files you were reading? You may 
get the schema using the parquet-schema CLI tool:

$ parquet-schema 

Here you can find instructions of how to build parquet-tools, just in case you 
don't have it at hand: 
https://github.com/Parquet/parquet-mr/issues/321

If you don't want to bother building parquet-tools (which can be sometimes 
troublesome), you may also try this in spark-shell:

hiveContext.table("your_table").head(1)

Then you should be able to find the Parquet schema from Spark driver log 
(please make sure you enable INFO log).


Cheng

On 9/24/15 7:59 PM, Dominic Ricard wrote:
> No, those were just examples on how maps can look like. In my case, the 
> key-value is either there or not in the form of the later:
>
> {"key1":{"key2":"value"}}
>
> If key1 is present, then it will contain a tuple of key2:value, value being a 
> 'int'
>
> I guess, after some testing, that my problem is on how casting a Map value to 
> the primitives Float and Double are handled. Handling INT is all good but 
> float and double are causing the exception.
>
> Thanks.
>
> Dominic Ricard
> Triton Digital
>
> -Original Message-
> From: Cheng Lian [mailto:lian.cs@gmail.com]
> Sent: Thursday, September 24, 2015 5:47 PM
> To: Dominic Ricard; user@spark.apache.org
> Subject: Re: Using Map and Basic Operators yield java.lang.ClassCastException 
> (Parquet + Hive + Spark SQL 1.5.0 + Thrift)
>
>
>
> On 9/24/15 11:34 AM, Dominic Ricard wrote:
>> Hi,
>>  I stumbled on the following today. We have Parquet files that
>> expose a column in a Map format. This is very convenient as we have
>> data parts that can vary in time. Not knowing what the data will be,
>> we simply split it in tuples and insert it as a map inside 1 column.
>>
>> Retrieving the data is very easy. Syntax looks like this:
>>
>> select column.key1.key2 from table;
>>
>> Column value look like this:
>> {}
>> {"key1":"value"}
>> {"key1":{"key2":"value"}}
> Do you mean that the value type of the map may also vary? The 2nd record has 
> a string value, while the 3rd one has another nested map as its value. This 
> isn't supported in Spark SQL.
>> But when trying to do basic operators on that column, I get the
>> following
>> error:
>>
>> query: select (column.key1.key2 / 30 < 1) from table
>>
>> ERROR processing query/statement. Error Code: 0, SQL state:
>> TStatus(statusCode:ERROR_STATUS,
>> infoMessages:[*org.apache.hive.service.cli.HiveSQLException:java.lang.ClassCastException:
>> org.apache.spark.sql.types.NullType$ cannot be cast to
>> org.apache.spark.sql.types.MapType:26:25,
>> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:
>> runInternal:SparkExecuteStatementOperation.scala:259,
>> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:
>> run:SparkExecuteStatementOperation.scala:144,
>> org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementIn
>> ternal:HiveSessionImpl.java:388,
>> org.apache.hive.service.cli.session.HiveSessionImpl:executeStatement:H
>> iveSessionImpl.java:369,
>> sun.reflect.GeneratedMethodAccessor115:invoke::-1,
>> sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccess
>> orImpl.java:43, java.lang.reflect.Method:invoke:Method.java:497,
>> org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessio

Re: CassandraSQLContext throwing NullPointer Exception

2015-09-28 Thread Priya Ch
Ted,

I am using spark 1.3.0 and running the code in YARN mode.

Here is the code..

object streaming {
def main(args:Array[String])
{
  val conf = new SparkConfst
  conf..setMaster("yarn-client")
  conf.setAppName("SimpleApp")
conf.set("spark.cassandra.coonection.host","")

val sc = new SparkContext(conf)
val sqlContext = new CassandraSQLContext(sc)

val dstream = KafkaUtils.createStream()
val words = dstream.flatMap(line => line.split(","))
val wordPairs = words.map(word =>(word,1))
val reducedStream = wordPairs.reduceByKey((a,b) => a+b)

reducedStream.foreachRDD{
rdd => rdd.foreach{
case(key,value) => val df = getDataFrame(sqlContext, key)
df.save("org.apach.spark.cassandra",SaveMode.Overwrite, Map("c_table" ->
"table1","kespace" -> "test"))
}
}
  }

def getDataFrame(cqlContext:CassandraSQLContext, key:String):DataFrame =
cqlContext.sql("select word,count from wordcount where word = '"+key+"'")
}

In the above code, the method getDataFrame is throwing Null Pointer
Exception at cqlContext.sql line.

On Mon, Sep 28, 2015 at 6:54 PM, Ted Yu  wrote:

> Which Spark release are you using ?
>
> Can you show the snippet of your code around CassandraSQLContext#sql() ?
>
> Thanks
>
> On Mon, Sep 28, 2015 at 6:21 AM, Priya Ch 
> wrote:
>
>> Hi All,
>>
>>  I am trying to use dataframes (which contain data from cassandra) in
>> rdd.foreach. This is throwing the following exception:
>>
>> Is CassandraSQLContext accessible within executor 
>>
>> 15/09/28 17:22:40 ERROR JobScheduler: Error running job streaming job
>> 144344116 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 6.0 (TID 83, blrrndnbudn05.rnd.amadeus.net):
>> java.lang.NullPointerException
>> at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:134)
>> at
>> org.apache.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:74)
>> at
>> org.apache.spark.sql.cassandra.CassandraSQLContext.sql(CassandraSQLContext.scala:77)
>> at
>> com.amadeus.spark.example.SparkDemo$.withDataFrames(SparkDemo.scala:170)
>> at
>> com.amadeus.spark.example.SparkDemo$$anonfun$main$1$$anonfun$apply$1.apply(SparkDemo.scala:158)
>> at
>> com.amadeus.spark.example.SparkDemo$$anonfun$main$1$$anonfun$apply$1.apply(SparkDemo.scala:158)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>>
>


Re: SQL queries in Spark / YARN

2015-09-28 Thread Mark Hamstra
Yes.

On Mon, Sep 28, 2015 at 12:46 PM, Robert Grandl 
wrote:

> Hi guys,
>
> I was wondering if it's possible to submit SQL queries to Spark SQL, when
> Spark is running atop YARN instead of standalone mode.
>
> Thanks,
> Robert
>


Re: SQL queries in Spark / YARN

2015-09-28 Thread Kartik Mathur
Hey Robert you could use Zeppelin iInstead If you don't want to use beeline
.

On Monday, September 28, 2015, Robert Grandl 
wrote:

> Thanks Mark. Do you know how ? In Spark standalone mode I use beeline to
> submit SQL scripts.
>
> In Spark/YARN, the only way I can see this will work is using
> spark-submit. However as it looks, I need to encapsulate the SQL queries in
> a Scala file. Do you have other suggestions ?
>
> Thanks,
> Robert
>
>
>
> On Monday, September 28, 2015 2:47 PM, Mark Hamstra <
> m...@clearstorydata.com
> > wrote:
>
>
> Yes.
>
> On Mon, Sep 28, 2015 at 12:46 PM, Robert Grandl  > wrote:
>
> Hi guys,
>
> I was wondering if it's possible to submit SQL queries to Spark SQL, when
> Spark is running atop YARN instead of standalone mode.
>
> Thanks,
> Robert
>
>
>
>
>


Re: HDP 2.3 support for Spark 1.5.x

2015-09-28 Thread Krishna Sankar
Thanks Guys. Yep, now I would install 1.5.1 over HDP 2.3, if that works.
Cheers


On Mon, Sep 28, 2015 at 9:47 AM, Ted Yu  wrote:

> Krishna:
> If you want to query ORC files, see the following JIRA:
>
> [SPARK-10623] [SQL] Fixes ORC predicate push-down
>
> which is in the 1.5.1. release.
>
> FYI
>
> On Mon, Sep 28, 2015 at 9:42 AM, Fabien Martin 
> wrote:
>
>> Hi Krishna,
>>
>>- Take a lokk at 
>> *http://hortonworks.com/hadoop-tutorial/apache-spark-1-4-1-technical-preview-with-hdp/
>>
>> *
>>- Or you can specify your 1.5.x jar as the Spark one using something
>>like :
>>
>> --conf
>> "spark.yarn.jar=hdfs://master:8020/spark-assembly-1.5.0-hadoop2.6.0.jar"
>>
>> The main drawback is :
>>
>> *Known Issues*
>>
>> *Spark YARN ATS integration does not work in this tech preview. You will
>> not see the history of Spark jobs in the Jobs server after a job is
>> finished.*
>>
>> 2015-09-23 1:31 GMT+02:00 Zhan Zhang :
>>
>>> Hi Krishna,
>>>
>>> For the time being, you can download from upstream, and it should be
>>> running OK for HDP2.3.  For hdp specific problem, you can ask in
>>> Hortonworks forum.
>>>
>>> Thanks.
>>>
>>> Zhan Zhang
>>>
>>> On Sep 22, 2015, at 3:42 PM, Krishna Sankar  wrote:
>>>
>>> Guys,
>>>
>>>- We have HDP 2.3 installed just now. It comes with Spark 1.3.x. The
>>>current wisdom is that it will support the 1.4.x train (which is good, 
>>> need
>>>DataFrame et al).
>>>- What is the plan to support Spark 1.5.x ? Can we install 1.5.0 on
>>>HDP 2.3 ? Or will Spark 1.5.x support be in HDP 2.3.x and if so ~when ?
>>>
>>> Cheers & Thanks
>>> 
>>>
>>>
>>>
>>
>


SQL queries in Spark / YARN

2015-09-28 Thread Robert Grandl
Hi guys,
I was wondering if it's possible to submit SQL queries to Spark SQL, when Spark 
is running atop YARN instead of standalone mode. 

Thanks,Robert

UnknownHostException with Mesos and custom Jar

2015-09-28 Thread Stephen Hankinson
Hi,

Wondering if anyone can help me with the issue I am having.

I am receiving an UnknownHostException when running a custom jar with Spark
on Mesos. The issue does not happen when running spark-shell.

My spark-env.sh contains the following:

export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos.so

export HADOOP_CONF_DIR=/hadoop-2.7.1/etc/hadoop/

My spark-defaults.conf contains the following:

spark.master   mesos://zk://172.31.0.81:2181,
172.31.16.81:2181,172.31.32.81:2181/mesos

spark.mesos.executor.home  /spark-1.5.0-bin-hadoop2.6/

Starting spark-shell as follows and running the following line works
correctly:

/spark-1.5.0-bin-hadoop2.6/bin/spark-shell

sc.textFile("/tmp/Input").collect.foreach(println)

15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(88528) called
with curMem=0, maxMem=556038881

15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 86.5 KB, free 530.2 MB)

15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(20236) called
with curMem=88528, maxMem=556038881

15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
as bytes in memory (estimated size 19.8 KB, free 530.2 MB)

15/09/28 20:04:49 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on 172.31.21.104:49048 (size: 19.8 KB, free: 530.3 MB)

15/09/28 20:04:49 INFO spark.SparkContext: Created broadcast 0 from
textFile at :22

15/09/28 20:04:49 INFO mapred.FileInputFormat: Total input paths to process
: 1

15/09/28 20:04:49 INFO spark.SparkContext: Starting job: collect at
:22

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Got job 0 (collect at
:22) with 3 output partitions

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Final stage: ResultStage
0(collect at :22)

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Parents of final stage:
List()

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Missing parents: List()

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[1] at textFile at :22), which has no missing
parents

15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(3120) called
with curMem=108764, maxMem=556038881

15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_1 stored as
values in memory (estimated size 3.0 KB, free 530.2 MB)

15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(1784) called
with curMem=111884, maxMem=556038881

15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_1_piece0 stored
as bytes in memory (estimated size 1784.0 B, free 530.2 MB)

15/09/28 20:04:49 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on 172.31.21.104:49048 (size: 1784.0 B, free: 530.3 MB)

15/09/28 20:04:49 INFO spark.SparkContext: Created broadcast 1 from
broadcast at DAGScheduler.scala:861

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Submitting 3 missing tasks
from ResultStage 0 (MapPartitionsRDD[1] at textFile at :22)

15/09/28 20:04:49 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
with 3 tasks

15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, ip-172-31-37-82.us-west-2.compute.internal, NODE_LOCAL, 2142
bytes)

15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
0.0 (TID 1, ip-172-31-21-104.us-west-2.compute.internal, NODE_LOCAL, 2142
bytes)

15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
0.0 (TID 2, ip-172-31-4-4.us-west-2.compute.internal, NODE_LOCAL, 2142
bytes)

15/09/28 20:04:52 INFO storage.BlockManagerMasterEndpoint: Registering
block manager ip-172-31-4-4.us-west-2.compute.internal:50648 with 530.3 MB
RAM, BlockManagerId(20150928-190245-1358962604-5050-11297-S2,
ip-172-31-4-4.us-west-2.compute.internal, 50648)

15/09/28 20:04:52 INFO storage.BlockManagerMasterEndpoint: Registering
block manager ip-172-31-37-82.us-west-2.compute.internal:52624 with 530.3
MB RAM, BlockManagerId(20150928-190245-1358962604-5050-11297-S1,
ip-172-31-37-82.us-west-2.compute.internal, 52624)

15/09/28 20:04:52 INFO storage.BlockManagerMasterEndpoint: Registering
block manager ip-172-31-21-104.us-west-2.compute.internal:56628 with 530.3
MB RAM, BlockManagerId(20150928-190245-1358962604-5050-11297-S0,
ip-172-31-21-104.us-west-2.compute.internal, 56628)

15/09/28 20:04:52 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on ip-172-31-37-82.us-west-2.compute.internal:52624 (size: 1784.0
B, free: 530.3 MB)

15/09/28 20:04:52 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on ip-172-31-21-104.us-west-2.compute.internal:56628 (size:
1784.0 B, free: 530.3 MB)

15/09/28 20:04:52 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on ip-172-31-4-4.us-west-2.compute.internal:50648 (size: 1784.0
B, free: 530.3 MB)

15/09/28 20:04:52 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on ip-172-31-37-82.us-west-2.compute.internal:52624 (size: 19.8
KB, free: 530.3 

Adding / Removing worker nodes for Spark Streaming

2015-09-28 Thread Augustus Hong
Hey all,

I'm evaluating using Spark Streaming with Kafka direct streaming, and I
have a couple of questions:

1.  Would it be possible to add / remove worker nodes without stopping and
restarting the spark streaming driver?

2.  I understand that we can enable checkpointing to recover from node
failures, and that it doesn't work across code changes.  What about in the
event that worker nodes failed due to load -> we added more worker nodes ->
restart Spark Streaming?  Would this incur data loss as well?


Best,
Augustus

-- 
[image: Branch Metrics mobile deep linking] * Augustus
Hong*
 Data Analytics | Branch Metrics
 m 650-391-3369 | e augus...@branch.io


Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-09-28 Thread Kartik Mathur
Hey Rick ,
Not sure on this but similar situation happened with me, when starting
spark-shell it was starting a new cluster instead of using the existing
cluster and this new cluster was a single node cluster , that's why jobs
were taking forever to complete from spark-shell and were running much
faster using submit (which reads conf correctly) or zeppelin for that
matter.

Thanks,
Kartik

On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz  wrote:

> I've finally been able to pick this up again, after upgrading to Spark
> 1.4.1, because my code used the HiveContext, which runs fine in the REPL
> (be it via Zeppelin or the shell) but won't work with spark-submit.
> With 1.4.1, I hav actually managed to get a result with the Spark shell,
> but after
> 3847,802237 seconds and in particular the last stage took 1320,672 seconds.
> This was after I used coalesce to balance the workload initiall, since a
> Hive filter I applied normally would make for a skewed distribution of the
> data onto the nodes.
> Nonetheless, the same code (even withouth the coalesce) would work much
> faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit
> job, the run time was just a tenth at
> 446,657534 seconds for the entire job and notably 38,961 seconds for the
> final stage.
>
> Again, there is a huge difference in the amount of data that gets
> shuffled/spilled (which leads to much earlier OOM-conditions), when using
> spark-shell.
> What could be the reason for this different behaviour using very similar
> configurations and identical data, machines and code (identical DAGs and
> sources) and identical spark binaries? Why would code launched from
> spark-shell generate more shuffled data for the same number of shuffled
> tuples?
>
> An analysis would be much appreciated.
>
> Best,
>
> Rick
>
> On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz  wrote:
>
>> oops, forgot to reply-all on this thread.
>>
>> -- Forwarded message --
>> From: Rick Moritz 
>> Date: Wed, Aug 19, 2015 at 2:46 PM
>> Subject: Re: Strange shuffle behaviour difference between Zeppelin and
>> Spark-shell
>> To: Igor Berman 
>>
>>
>> Those values are not explicitely set, and attempting to read their values
>> results in 'java.util.NoSuchElementException: spark.shuffle.spill.compress'.
>> What I mean by the volume per element being larger is illustrated in my
>> original post: for each case the number of elements is identical, but the
>> volume of data required to obtain/manage these elements is many times
>> greater.
>>
>> The only difference used to be that Zeppelin had FAIR scheduling over
>> FIFO scheduling for spark-shell. I just verified that spark-shell with FAIR
>> scheduling makes no difference. The only other difference in the
>> environment lies in some class-path variables which should only affect
>> method availability, not actual usage.
>>
>> Another fact to note: Spark assembly (1.4.0-rc4) was built with provided
>> hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0
>> -Phadoop-provided -Phive -Phive-thriftserver -Psparkr -DskipTests clean
>> package) for 2.6.0 from Hortonworks, while Zeppelin was built with
>> dependencies against 2.6.0 from Maven central.
>>
>> On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman 
>> wrote:
>>
>>> so what your case for version differences?
>>> what do u mean by  "in spark-shell the volume per element is much
>>> larger"
>>> can you verify that configuration in spark ui (under Environment tab is
>>> same).
>>> if you suspect compression than check following properties:
>>> spark.shuffle.compress
>>> spark.shuffle.spill.compress
>>> spark.io.compression.codec
>>> spark.rdd.compress
>>>
>>>
>>>
>>> On 19 August 2015 at 15:03, Rick Moritz  wrote:
>>>
 Number of partitions and even size look relatively similar - except in
 spark-shell the volume per element is much larger, especially in later
 stages. That's when shuffles start to spill. Zeppelin creates almost no
 spills at all. The number of elements per partition are the same for both
 setups, but with very different data volume in/out. Almost as though
 compression was used in one case, and not in another, or as though
 shuffling is somehow less specific, and more nodes get data that they
 ultimately don't process at all. The same shuffling algorithm appears to be
 at work in each case, if the partitioning of the number of elements is
 anything to go by.

 On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman 
 wrote:

> i would compare spark ui metrics for both cases and see any
> differences(number of partitions, number of spills etc)
> why can't you make repl to be consistent with zepellin spark version?
>  might be rc has issues...
>
>
>
>
> On 19 August 2015 at 14:42, Rick Moritz  

Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-09-28 Thread Kartik Mathur
Ok, that might be possible , to confirm that you can explicitly specify the
serializer in both cases (by setting this spark.serializer i guess). So
then you can be sure that same serializers are used and may be then do an
analysis.

Best,
Kartik

On Mon, Sep 28, 2015 at 11:38 AM, Rick Moritz  wrote:

> Hi Kartik,
>
> Thanks for the input!
>
> Sadly, that's not it - I'm using YARN - the configuration looks identical,
> and the nodes/memory/cores are deployed identically and exactly as
> specified.
>
> My current hunch, is that for some reason different serializers are used
> in each case, but I can find no documentation on why that could be the
> case, and the configuration isn't indicative of that either.
> Nonetheless, the symptom of different shuffle volume for same shuffle
> number of tuples could well point to that as source of my issue.
> In fact, a colleague pointed out that HIS (Cloudera) installation was
> defaulting to kryo for the spark-shell, which had an impact for some jobs.
> I couldn't find the document he was referring to as a source of this
> information, but the behavior sounds plausible at least.
>
> Best,
>
> Rick
>
>
> On Mon, Sep 28, 2015 at 8:24 PM, Kartik Mathur 
> wrote:
>
>> Hey Rick ,
>> Not sure on this but similar situation happened with me, when starting
>> spark-shell it was starting a new cluster instead of using the existing
>> cluster and this new cluster was a single node cluster , that's why jobs
>> were taking forever to complete from spark-shell and were running much
>> faster using submit (which reads conf correctly) or zeppelin for that
>> matter.
>>
>> Thanks,
>> Kartik
>>
>> On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz  wrote:
>>
>>> I've finally been able to pick this up again, after upgrading to Spark
>>> 1.4.1, because my code used the HiveContext, which runs fine in the REPL
>>> (be it via Zeppelin or the shell) but won't work with spark-submit.
>>> With 1.4.1, I hav actually managed to get a result with the Spark shell,
>>> but after
>>> 3847,802237 seconds and in particular the last stage took 1320,672
>>> seconds.
>>> This was after I used coalesce to balance the workload initiall, since a
>>> Hive filter I applied normally would make for a skewed distribution of the
>>> data onto the nodes.
>>> Nonetheless, the same code (even withouth the coalesce) would work much
>>> faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit
>>> job, the run time was just a tenth at
>>> 446,657534 seconds for the entire job and notably 38,961 seconds for the
>>> final stage.
>>>
>>> Again, there is a huge difference in the amount of data that gets
>>> shuffled/spilled (which leads to much earlier OOM-conditions), when using
>>> spark-shell.
>>> What could be the reason for this different behaviour using very similar
>>> configurations and identical data, machines and code (identical DAGs and
>>> sources) and identical spark binaries? Why would code launched from
>>> spark-shell generate more shuffled data for the same number of shuffled
>>> tuples?
>>>
>>> An analysis would be much appreciated.
>>>
>>> Best,
>>>
>>> Rick
>>>
>>> On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz  wrote:
>>>
 oops, forgot to reply-all on this thread.

 -- Forwarded message --
 From: Rick Moritz 
 Date: Wed, Aug 19, 2015 at 2:46 PM
 Subject: Re: Strange shuffle behaviour difference between Zeppelin and
 Spark-shell
 To: Igor Berman 


 Those values are not explicitely set, and attempting to read their
 values results in 'java.util.NoSuchElementException:
 spark.shuffle.spill.compress'.
 What I mean by the volume per element being larger is illustrated in my
 original post: for each case the number of elements is identical, but the
 volume of data required to obtain/manage these elements is many times
 greater.

 The only difference used to be that Zeppelin had FAIR scheduling over
 FIFO scheduling for spark-shell. I just verified that spark-shell with FAIR
 scheduling makes no difference. The only other difference in the
 environment lies in some class-path variables which should only affect
 method availability, not actual usage.

 Another fact to note: Spark assembly (1.4.0-rc4) was built with
 provided hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6
 -Dhadoop.version=2.6.0 -Phadoop-provided -Phive -Phive-thriftserver
 -Psparkr -DskipTests clean package) for 2.6.0 from Hortonworks, while
 Zeppelin was built with dependencies against 2.6.0 from Maven central.

 On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman 
 wrote:

> so what your case for version differences?
> what do u mean by  "in spark-shell the volume per element is much
> larger"
> can you verify that configuration in 

Re: Python script runs fine in local mode, errors in other modes

2015-09-28 Thread Aaron
Hi! Yeah, the problem was inconsistent versions of python across the cluster. I 
was launching from a node with Python 2.7.9, but the job ran on nodes with 
2.6.6.

From: "devmacrile [via Apache Spark User List]" 
>
Date: Monday, September 28, 2015 at 1:35 PM
To: Aaron Dossett >
Subject: Re: Python script runs fine in local mode, errors in other modes

Was there any eventual solution to this that you discovered?


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Python-script-runs-fine-in-local-mode-errors-in-other-modes-tp12390p24847.html
To unsubscribe from Python script runs fine in local mode, errors in other 
modes, click 
here.
NAML




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-script-runs-fine-in-local-mode-errors-in-other-modes-tp12390p24848.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: About memory leak in spark 1.4.1

2015-09-28 Thread Jon Chase
I'm seeing a similar (same?) problem on Spark 1.4.1 running on Yarn (Amazon
EMR, Java 8).  I'm running a Spark Streaming app 24/7 and system memory
eventually gets exhausted after about 3 days and the JVM process dies with:

#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 12288 bytes for committing
reserved memory.
# An error report file with more information is saved as:
#
/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1442933070871_0002/container_1442933070871_0002_01_02/hs_err_pid19082.log
[thread 139846843156224 also had an error]


To reiterate what Sea said, the heap is fine, this is NOT a heap memory
issue - I've monitored it with scripts and also observed it via VisualVm -
this is an off heap issue.  I ran pmap on the pid of CoarseGrainedExecutor,
spaced about 5 hours apart, and saw several 64mb chunks of off heap memory
allocated in that time:


7fccd400  65500K rw---[ anon ]
7fccd7ff7000 36K -[ anon ]
7fccd800  65528K rw---[ anon ]
7fccdbffe000  8K -[ anon ]
7fccdc00  65504K rw---[ anon ]
7fccdfff8000 32K -[ anon ]
7fcce000  65536K rw---[ anon ]
7fcce400  65508K rw---[ anon ]
7fcce7ff9000 28K -[ anon ]
7fcce800  65524K rw---[ anon ]
7fccebffd000 12K -[ anon ]
7fccec00  65532K rw---[ anon ]
7fcce000  4K -[ anon ]
7fccf000  65496K rw---[ anon ]
7fccf3ff6000 40K -[ anon ]
7fccf400  65496K rw---[ anon ]
7fccf7ff6000 40K -[ anon ]
7fccf800  65532K rw---[ anon ]
7fccfbfff000  4K -[ anon ]
7fccfc00  65520K rw---[ anon ]
7fccc000 16K -[ anon ]
7fcd  65508K rw---[ anon ]
7fcd03ff9000 28K -[ anon ]

Over these 8 hours, total memory usage by the JVM (as reported by top) had
grown ~786mb over 5 hours, or basically the sum of those 13 64mb chunks.

I dumped the memory from /proc/pid/, and was able to see a bunch of lines
from the data files that my Spark job is processing, but I couldn't tell
figure out what was actually creating these 64mb chunks.   I thought it
might be netty so I set spark.shuffle.io.preferDirectBufs to false, but
that hasn't changed anything.

The only thing I see in the config page regarding "64mb" is
spark.kryoserializer.buffer.max, which defaults to 64mb.  I'll try setting
that to something different, but as far as I know, kryo is not doing
anything off heap.

Still wondering if this could be netty, or maybe something Akka is doing if
it's using off heap mem?

There were not ERROR messages in the executor (or driver's) logs during
this time.

Any help would be greatly appreciated.  This issue continues to cause our
streaming apps to die every few days ,which is...less than ideal! :)

On Wed, Aug 5, 2015 at 9:10 AM, Sea <261810...@qq.com> wrote:

> No one help me... I help myself, I split the cluster to two cluster
> 1.4.1 and 1.3.0
>
>
> -- 原始邮件 --
> *发件人:* "Ted Yu";;
> *发送时间:* 2015年8月4日(星期二) 晚上10:28
> *收件人:* "Igor Berman";
> *抄送:* "Sea"<261810...@qq.com>; "Barak Gitsis"; "
> user@spark.apache.org"; "rxin";
> "joshrosen"; "davies";
> *主题:* Re: About memory leak in spark 1.4.1
>
> w.r.t. spark.deploy.spreadOut , here is the scaladoc:
>
>   // As a temporary workaround before better ways of configuring memory,
> we allow users to set
>   // a flag that will perform round-robin scheduling across the nodes
> (spreading out each app
>   // among all the nodes) instead of trying to consolidate each app onto a
> small # of nodes.
>   private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut",
> true)
>
> Cheers
>
> On Tue, Aug 4, 2015 at 4:13 AM, Igor Berman  wrote:
>
>> sorry, can't disclose info about my prod cluster
>>
>> nothing jumps into my mind regarding your config
>> we don't use lz4 compression, don't know what is spark.deploy.spreadOut(there
>> is no documentation regarding this)
>>
>> If you are sure that you don't have memory leak in your business logic I
>> would try to reset each property to default(or just remove it from your
>> config) and try to run your job to see if it's not
>> somehow connected
>>
>> my config(nothing special really)
>> spark.shuffle.consolidateFiles true
>> spark.speculation false
>> spark.executor.extraJavaOptions -XX:+UseStringCache
>> -XX:+UseCompressedStrings -XX:+PrintGC -XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps -Xloggc:gc.log -verbose:gc
>> spark.executor.logs.rolling.maxRetainedFiles 1000
>> spark.executor.logs.rolling.strategy time
>> spark.worker.cleanup.enabled true
>> 

Re: how to handle OOMError from groupByKey

2015-09-28 Thread Alexis Gillain
"Note: As currently implemented, groupByKey must be able to hold all the
key-value pairs for any key in memory. If a key has too many values, it can
result in an [[OutOfMemoryError]]."

Obvioulsy one of your key value pair is two large. You can try to increase
spark.shuffle.memoryFraction.

Are you sure you can't :
partition your data by user/time-interval => process with a mapPartition =>
partition by user => process with a mapPartition
Not efficient but if your operation decrease the amount of data per user it
may work.


2015-09-29 0:17 GMT+08:00 Fabien Martin :

> You can try to reduce the number of containers in order to increase their
> memory.
>
> 2015-09-28 9:35 GMT+02:00 Akhil Das :
>
>> You can try to increase the number of partitions to get ride of the OOM
>> errors. Also try to use reduceByKey instead of groupByKey.
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran 
>> wrote:
>>
>>> Hi everyone,
>>> I have an RDD of the format (user: String, timestamp: Long, state:
>>> Boolean).  My task invovles converting the states, where on/off is
>>> represented as true/false, into intervals of 'on' of the format (beginTs:
>>> Long, endTs: Long).  So this task requires me, per user, to line up all of
>>> the on/off states so that I can compute when it is on, since the
>>> calculation is neither associative nor commutative.
>>>
>>> So there are 2 main operations that I'm trying to accomplish together:
>>> 1. group by each user
>>> 2. sort by time -- keep all of the states in sorted order by time
>>>
>>> The main code inside the method that does grouping by user and sorting
>>> by time looks sort of looks like this:
>>>
>>>
>>> // RDD starts off in format (user, ts, state) of type RDD[(String, Long,
>>> Boolean)]
>>> val grouped = keyedStatesRDD.groupByKey
>>> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of
>>> type RDD[(String, Iterable(Long, Boolean))]
>>> // take the sequence of (ts, state) per user, sort, get intervals
>>> val groupedIntervals = grouped.mapValues(
>>>   states => {
>>> val sortedStates = states.toSeq.sortBy(_._1)
>>> val intervals = DFUtil.statesToIntervals(sortedStates)
>>> val intervalsList = bucketDurations.map{case(k,v) =>
>>> (k,v)}(collection.breakOut).sortBy(_._1)
>>> intervalsList
>>>   }
>>> )
>>> // after .mapValues, new format for RDD is (user, seq-of-(startTime,
>>> endTime)) of type RDD[(String, IndexedSeq(Long, Long))]
>>>
>>>
>>> When I run my Spark job with 1 day's worth of data, the job completes
>>> successfully.  When I run with 1 month's or 1 year's worth of data, that
>>> method is where my Spark job consistently crashes with get
>>> OutOfMemoryErrors.  I need to run on the full year's worth of data.
>>>
>>> My suspicion is that the groupByKey is the problem (it's pulling all of
>>> the matching data values into a single executor's heap as a plain Scala
>>> Iterable).  But alternatives of doing sortByKey on the RDD first before
>>> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
>>> quite apply in my scenario because my operation is not associative (can't
>>> combine per-partition results) and I still need to group by users before
>>> doing a foldLeft.
>>>
>>> I've definitely thought about the issue before and come across users
>>> with issues that are similar but not exactly the same:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>>
>>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html
>>>
>>> And this Jira seems relevant too:
>>> https://issues.apache.org/jira/browse/SPARK-3655
>>>
>>> The amount of memory that I'm using is 2g per executor, and I can't go
>>> higher than that because each executor gets a YARN container from nodes
>>> with 16 GB of RAM and 5 YARN containers allowed per node.
>>>
>>> So I'd like to know if there's an easy solution to executing my logic on
>>> my full dataset in Spark.
>>>
>>> Thanks!
>>>
>>> -- Elango
>>>
>>
>>
>


-- 
Alexis GILLAIN


Performance when iterating over many parquet files

2015-09-28 Thread jwthomas
We are working with use cases where we need to do batch processing on a large
number (hundreds of thousands) of Parquet files.  The processing is quite
similar per file.  There are a many aggregates that are very SQL-friendly
(computing averages, maxima, minima, aggregations on single columns with
some selection criteria).  There are also some processing that is more
advanced time-series processing (continuous wavelet transforms and the
like).  This all seems like a good use case for Spark.

But I'm having performance problems.  Let's take a look at something very
simple, which simply checks whether the parquet files are readable.

Code that seems natural but doesn't work:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)

My understanding is that this doesn't work because sqlContext can't be used
inside of a transformation like "map" (or inside an action).  That it only
makes sense in the driver.  Thus, it becomes a null reference in the above
code, so all reads fail.

Code that works:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x =>
(x, Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)


This works because the collect() means that everything happens back on the
driver.  So the sqlContext object makes sense and everything works fine.

But it is slow.  I'm using yarn-client mode on a 6-node cluster with 17
executors, 40 GB ram on driver, 19GB on executors.  And it takes about 1
minute to execute for 100 parquet files.  Which is too long.  Recall we need
to do this across hundreds of thousands of files.

I realize it is possible to parallelize after the read:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val intermediate_successes =
parquetFiles.collect().map(x => (x,
Try(sqlContext.read.parquet(x
val dist_successes = sc.parallelize(successes) val successes =
dist_successes.filter(_._2.isSuccess).map(x => x._1)


But this does not improve performance substantially.  It seems the
bottleneck is that the reads are happening sequentially.

Is there a better way to do this?

Thanks,
Jordan




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.lang.ClassCastException (org.apache.spark.scheduler.ResultTask cannot be cast to org.apache.spark.scheduler.Task)

2015-09-28 Thread amitra123
Hello All,

I am trying to write a very simply Spark Streaming example problem and I m
getting this exception. I am new to Spark and I am not quite sure why this
exception is thrown. Wondering if someone has any clues. Here is the
backtrace. I am running this on Spark 1.5.0.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in
stage 0.0 failed 4 times, most recent failure: Lost task 9.3 in stage 0.0
(TID 17, 112.XXX.XXX.XXX): java.lang.ClassCastException:
org.apache.spark.scheduler.ResultTask cannot be cast to
org.apache.spark.scheduler.Task
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896)
at
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222)
at
org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47)
at
com.ay.spark.example.SparkWordCountStreaming.outputRdd(SparkWordCountStreaming.java:172)
at
com.ay.spark.example.SparkWordCountStreaming.lambda$start$b852b88$1(SparkWordCountStreaming.java:123)
at
com.ay.spark.example.SparkWordCountStreaming$$Lambda$9/1640832113.call(Unknown
Source)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at 

Re: Performance when iterating over many parquet files

2015-09-28 Thread Michael Armbrust
Another note: for best performance you are going to want your parquet files
to be pretty big (100s of mb).  You could coalesce them and write them out
for more efficient repeat querying.

On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust 
wrote:

> sqlContext.read.parquet
> 
> takes lists of files.
>
> val fileList = sc.textFile("file_list.txt").collect() // this works but
> using spark is possibly overkill
> val dataFrame = sqlContext.read.parquet(fileList: _*)
>
> On Mon, Sep 28, 2015 at 1:35 PM, jwthomas 
> wrote:
>
>> We are working with use cases where we need to do batch processing on a
>> large
>> number (hundreds of thousands) of Parquet files.  The processing is quite
>> similar per file.  There are a many aggregates that are very SQL-friendly
>> (computing averages, maxima, minima, aggregations on single columns with
>> some selection criteria).  There are also some processing that is more
>> advanced time-series processing (continuous wavelet transforms and the
>> like).  This all seems like a good use case for Spark.
>>
>> But I'm having performance problems.  Let's take a look at something very
>> simple, which simply checks whether the parquet files are readable.
>>
>> Code that seems natural but doesn't work:
>>
>> import scala.util.{Try, Success, Failure} val parquetFiles =
>> sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
>> Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)
>>
>> My understanding is that this doesn't work because sqlContext can't be
>> used
>> inside of a transformation like "map" (or inside an action).  That it only
>> makes sense in the driver.  Thus, it becomes a null reference in the above
>> code, so all reads fail.
>>
>> Code that works:
>>
>> import scala.util.{Try, Success, Failure} val parquetFiles =
>> sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x
>> =>
>> (x, Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x =>
>> x._1)
>>
>>
>> This works because the collect() means that everything happens back on the
>> driver.  So the sqlContext object makes sense and everything works fine.
>>
>> But it is slow.  I'm using yarn-client mode on a 6-node cluster with 17
>> executors, 40 GB ram on driver, 19GB on executors.  And it takes about 1
>> minute to execute for 100 parquet files.  Which is too long.  Recall we
>> need
>> to do this across hundreds of thousands of files.
>>
>> I realize it is possible to parallelize after the read:
>>
>> import scala.util.{Try, Success, Failure} val parquetFiles =
>> sc.textFile("file_list.txt") val intermediate_successes =
>> parquetFiles.collect().map(x => (x,
>> Try(sqlContext.read.parquet(x
>> val dist_successes = sc.parallelize(successes) val successes =
>> dist_successes.filter(_._2.isSuccess).map(x => x._1)
>>
>>
>> But this does not improve performance substantially.  It seems the
>> bottleneck is that the reads are happening sequentially.
>>
>> Is there a better way to do this?
>>
>> Thanks,
>> Jordan
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: spark.mesos.coarse impacts memory performance on mesos

2015-09-28 Thread Utkarsh Sengar
Hi Tim,

1. spark.mesos.coarse:false (fine grain mode)
This is the data dump for config and executors assigned:
https://gist.github.com/utkarsh2012/6401d5526feccab14687

2. spark.mesos.coarse:true (coarse grain mode)
Dump for coarse mode:
https://gist.github.com/utkarsh2012/918cf6f8ed5945627188

As you can see, exactly the same code works fine in fine grained, goes out
of memory in coarse grained mode. First an executor was lost and then the
driver went out of memory.
So I am trying to understand what is different in fine grained vs coarse
mode other than allocation of multiple mesos tasks vs 1 mesos task. Clearly
spark is not managing memory in the same way.

Thanks,
-Utkarsh


On Fri, Sep 25, 2015 at 9:17 AM, Tim Chen  wrote:

> Hi Utkarsh,
>
> What is your job placement like when you run fine grain mode? You said
> coarse grain mode only ran with one node right?
>
> And when the job is running could you open the Spark webui and get stats
> about the heap size and other java settings?
>
> Tim
>
> On Thu, Sep 24, 2015 at 10:56 PM, Utkarsh Sengar 
> wrote:
>
>> Bumping this one up, any suggestions on the stacktrace?
>> spark.mesos.coarse=true is not working and the driver crashed with the
>> error.
>>
>> On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar 
>> wrote:
>>
>>> Missed to do a reply-all.
>>>
>>> Tim,
>>>
>>> spark.mesos.coarse = true doesn't work and spark.mesos.coarse = false
>>> works (sorry there was a typo in my last email, I meant "when I do
>>> "spark.mesos.coarse=false", the job works like a charm. ").
>>>
>>> I get this exception with spark.mesos.coarse = true:
>>>
>>> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={
>>> "_id" : "55af4bf26750ad38a444d7cf"}, max= { "_id" :
>>> "55af5a61e8a42806f47546c1"}
>>>
>>> 15/09/22
>>> 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
>>> "55af5a61e8a42806f47546c1"}, max= null
>>>
>>> Exception
>>> in thread "main" java.lang.OutOfMemoryError: Java heap space
>>>
>>> 
>>> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>>>
>>> 
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>>
>>> 
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>>
>>> 
>>> at scala.Option.getOrElse(Option.scala:120)
>>>
>>> 
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>>
>>> 
>>> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>>>
>>> 
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>>
>>> 
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>>
>>> 
>>> at scala.Option.getOrElse(Option.scala:120)
>>>
>>> 
>>> at 

Re: Adding / Removing worker nodes for Spark Streaming

2015-09-28 Thread Sourabh Chandak
I also have the same use case as Augustus, and have some basic questions
about recovery from checkpoint. I have a 10 node Kafka cluster and a 30
node Spark cluster running streaming job, how is the (topic, partition)
data handled in checkpointing. The scenario I want to understand is, in
case of node failure how will a new node know the checkpoint of the failed
node?
The amount of data we have is huge and we can't run from the smallest
offset.

Thanks,
Sourabh

On Mon, Sep 28, 2015 at 11:43 AM, Augustus Hong 
wrote:

> Got it, thank you!
>
>
> On Mon, Sep 28, 2015 at 11:37 AM, Cody Koeninger 
> wrote:
>
>> Losing worker nodes without stopping is definitely possible.  I haven't
>> had much success adding workers to a running job, but I also haven't spent
>> much time on it.
>>
>> If you're restarting with the same jar, you should be able to recover
>> from checkpoint without losing data (usual caveats apply, e.g. you need
>> enough kafka retention).  Make sure to test it though, as the code paths
>> taken during recovery from checkpoint are not the same as on initial
>> startup, and you can run into unexpected issues (e.g. authentication).
>>
>> On Mon, Sep 28, 2015 at 1:27 PM, Augustus Hong > > wrote:
>>
>>> Hey all,
>>>
>>> I'm evaluating using Spark Streaming with Kafka direct streaming, and I
>>> have a couple of questions:
>>>
>>> 1.  Would it be possible to add / remove worker nodes without stopping
>>> and restarting the spark streaming driver?
>>>
>>> 2.  I understand that we can enable checkpointing to recover from node
>>> failures, and that it doesn't work across code changes.  What about in the
>>> event that worker nodes failed due to load -> we added more worker nodes ->
>>> restart Spark Streaming?  Would this incur data loss as well?
>>>
>>>
>>> Best,
>>> Augustus
>>>
>>> --
>>> [image: Branch Metrics mobile deep linking] * Augustus
>>> Hong*
>>>  Data Analytics | Branch Metrics
>>>  m 650-391-3369 | e augus...@branch.io
>>>
>>
>>
>
>
> --
> [image: Branch Metrics mobile deep linking] * Augustus
> Hong*
>  Data Analytics | Branch Metrics
>  m 650-391-3369 | e augus...@branch.io
>


Reading kafka stream and writing to hdfs

2015-09-28 Thread Chengi Liu
Hi,
  I am going thru this example here:
https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/kafka_wordcount.py
If I want to write this data on hdfs.
Whats the right way to do this?
Thanks


Re: Adding / Removing worker nodes for Spark Streaming

2015-09-28 Thread Cody Koeninger
If a node fails, the partition / offset range that it was working on will
be scheduled to run on another node.  This is generally true of spark,
regardless of checkpointing.

The offset ranges for a given batch are stored in the checkpoint for that
batch.  That's relevant if your entire job fails (driver failure, all
workers fail, etc).

If you really can't afford to run from the smallest offset and can't afford
to lose data, don't rely on spark checkpoints (because of the conditions
under which they can't be recovered).  Store the offset ranges yourself.


On Mon, Sep 28, 2015 at 4:34 PM, Sourabh Chandak 
wrote:

> I also have the same use case as Augustus, and have some basic questions
> about recovery from checkpoint. I have a 10 node Kafka cluster and a 30
> node Spark cluster running streaming job, how is the (topic, partition)
> data handled in checkpointing. The scenario I want to understand is, in
> case of node failure how will a new node know the checkpoint of the failed
> node?
> The amount of data we have is huge and we can't run from the smallest
> offset.
>
> Thanks,
> Sourabh
>
> On Mon, Sep 28, 2015 at 11:43 AM, Augustus Hong  > wrote:
>
>> Got it, thank you!
>>
>>
>> On Mon, Sep 28, 2015 at 11:37 AM, Cody Koeninger 
>> wrote:
>>
>>> Losing worker nodes without stopping is definitely possible.  I haven't
>>> had much success adding workers to a running job, but I also haven't spent
>>> much time on it.
>>>
>>> If you're restarting with the same jar, you should be able to recover
>>> from checkpoint without losing data (usual caveats apply, e.g. you need
>>> enough kafka retention).  Make sure to test it though, as the code paths
>>> taken during recovery from checkpoint are not the same as on initial
>>> startup, and you can run into unexpected issues (e.g. authentication).
>>>
>>> On Mon, Sep 28, 2015 at 1:27 PM, Augustus Hong <
>>> augus...@branchmetrics.io> wrote:
>>>
 Hey all,

 I'm evaluating using Spark Streaming with Kafka direct streaming, and I
 have a couple of questions:

 1.  Would it be possible to add / remove worker nodes without stopping
 and restarting the spark streaming driver?

 2.  I understand that we can enable checkpointing to recover from node
 failures, and that it doesn't work across code changes.  What about in the
 event that worker nodes failed due to load -> we added more worker nodes ->
 restart Spark Streaming?  Would this incur data loss as well?


 Best,
 Augustus

 --
 [image: Branch Metrics mobile deep linking] * Augustus
 Hong*
  Data Analytics | Branch Metrics
  m 650-391-3369 | e augus...@branch.io

>>>
>>>
>>
>>
>> --
>> [image: Branch Metrics mobile deep linking] * Augustus
>> Hong*
>>  Data Analytics | Branch Metrics
>>  m 650-391-3369 | e augus...@branch.io
>>
>
>


Re: java.lang.ClassCastException (org.apache.spark.scheduler.ResultTask cannot be cast to org.apache.spark.scheduler.Task)

2015-09-28 Thread Ted Yu
Please see SPARK-8142

On Mon, Sep 28, 2015 at 1:45 PM, amitra123  wrote:

> Hello All,
>
> I am trying to write a very simply Spark Streaming example problem and I m
> getting this exception. I am new to Spark and I am not quite sure why this
> exception is thrown. Wondering if someone has any clues. Here is the
> backtrace. I am running this on Spark 1.5.0.
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 9
> in
> stage 0.0 failed 4 times, most recent failure: Lost task 9.3 in stage 0.0
> (TID 17, 112.XXX.XXX.XXX): java.lang.ClassCastException:
> org.apache.spark.scheduler.ResultTask cannot be cast to
> org.apache.spark.scheduler.Task
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896)
> at
>
> org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222)
> at
>
> org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47)
> at
>
> com.ay.spark.example.SparkWordCountStreaming.outputRdd(SparkWordCountStreaming.java:172)
> at
>
> com.ay.spark.example.SparkWordCountStreaming.lambda$start$b852b88$1(SparkWordCountStreaming.java:123)
> at
>
> com.ay.spark.example.SparkWordCountStreaming$$Lambda$9/1640832113.call(Unknown
> Source)
> at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
> at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
> at
>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
>
> 

Re: HDFS small file generation problem

2015-09-28 Thread Jörn Franke
Use hadoop archive

Le dim. 27 sept. 2015 à 15:36,   a écrit :

> Hello,
> I'm still investigating my small file generation problem generated by my
> Spark Streaming jobs.
> Indeed, my Spark Streaming jobs are receiving a lot of small events (avg
> 10kb), and I have to store them inside HDFS in order to treat them by PIG
> jobs on-demand.
> The problem is the fact that I generate a lot of small files in HDFS
> (several millions) and it can be problematic.
> I investigated to use Hbase or Archive file but I don't want to do it
> finally.
> So, what about this solution :
> - Spark streaming generate on the fly several millions of small files in
> HDFS
> - Each night I merge them inside a big daily file
> - I launch my PIG jobs on this big file ?
>
> Other question I have :
> - Is it possible to append a big file (daily) by adding on the fly my
> event ?
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming Log4j Inside Eclipse

2015-09-28 Thread Ashish Soni
I am not running it using spark submit , i am running locally inside
Eclipse IDE , how i set this using JAVA Code

Ashish

On Mon, Sep 28, 2015 at 10:42 AM, Adrian Tanase  wrote:

> You also need to provide it as parameter to spark submit
>
> http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver
>
> From: Ashish Soni
> Date: Monday, September 28, 2015 at 5:18 PM
> To: user
> Subject: Spark Streaming Log4j Inside Eclipse
>
> I need to turn off the verbose logging of Spark Streaming Code when i am
> running inside eclipse i tried creating a log4j.properties file and placed
> inside /src/main/resources but i do not see it getting any effect , Please
> help as not sure what else needs to be done to change the log at DEBUG or
> WARN
>


Spark REST Job server feedback?

2015-09-28 Thread Ramirez Quetzal
Anyone has feedback on using Hue / Spark Job Server REST servers?

http://gethue.com/how-to-use-the-livy-spark-rest-job-server-for-interactive-spark/

https://github.com/spark-jobserver/spark-jobserver

Many thanks,

Rami


Re: Notification on Spark Streaming job failure

2015-09-28 Thread Chen Song
I am also interested specifically in monitoring and alerting on Spark
streaming jobs. It will be helpful to get some general guidelines or advice
on this, from people who implemented anything on this.

On Fri, Sep 18, 2015 at 2:35 AM, Krzysztof Zarzycki 
wrote:

> Hi there Spark Community,
> I would like to ask you for an advice: I'm running Spark Streaming jobs in
> production. Sometimes these jobs fail and I would like to get email
> notification about it. Do you know how I can set up Spark to notify me by
> email if my job fails? Or do I have to use external monitoring tool?
> I'm thinking of the following options:
> 1. As I'm running those jobs on YARN, monitor somehow YARN jobs. Looked
> for it as well but couldn't find any YARN feature to do it.
> 2. Run Spark Streaming job in some scheduler, like Oozie, Azkaban, Luigi.
> Those are created rather for batch jobs, not streaming, but could work. Has
> anyone tried that?
> 3. Run job driver under "monit" tool and catch the failure and send an
> email about it. Currently I'm deploying with yarn-cluster mode and I would
> need to resign from it to run under monit
> 4. Implement monitoring tool (like Graphite, Ganglia, Prometheus) and use
> Spark metrics. And then implement alerting in those. Can I get information
> of failed jobs in Spark metrics?
> 5. As 4. but implement my own custom job metrics and monitor them.
>
> What's your opinion about my options? How do you people solve this
> problem? Anything Spark specific?
> I'll be grateful for any advice in this subject.
> Thanks!
> Krzysiek
>
>


-- 
Chen Song


Re: Update cassandra rows problem

2015-09-28 Thread Ted Yu
Please consider posting on DataStax's mailing list for question w.r.t. spark
cassandra connector

On Mon, Sep 28, 2015 at 6:59 AM, amine_901 
wrote:

> Hello all,
> i'm using spark 1.2 with spark cassandra connector 1.2.3,
> i'm trying to update somme rows of table:
>
> example:
> *CREATE TABLE myTable (
> a text,
> b text,
> c text,
> date timestamp,
> d text,
> e text static,
> f text static,
> PRIMARY KEY ((a, b, c), date, d)
> ) WITH CLUSTERING ORDER BY (date ASC, d ASC)*
>
> *val interactions = sc.cassandraTable[(String, String, String, DateTime,
> String, String)]("keySpace", "myTable").
> select("a","b","c","date", "d", "e","f")
>   val empty = interactions.filter(r => r._6 == null).cache()
>   empty.count()*
>
> I just count the number of rows containing null for "e" and the remplace
> them by the value of "b"
>
> * val update_inter = empty.map( r =>  (r._1,r._2, r._3, r._4, r._5, r._2))
>   update_inter.saveToCassandra("keySpace", "myTable",
> SomeColumns("a","b","c","date", "d", "e", "f"))*
>
> this works when i check in cqlsh , but i still get the value null when i
> request the same rows by spark cassandra .
>
> Is this a bug in spark cassandra connector? Thanks for your help.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Update-cassandra-rows-problem-tp24844.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Interactively search Parquet-stored data using Spark Streaming and DataFrames

2015-09-28 Thread Նարեկ Գալստեան
I have significant amount of data stored on my Hadoop HDFS as Parquet files
I am using Spark streaming to interactively receive queries from a web
server and transform the received queries into SQL to run on my data using
SparkSQL.

In this process I need to run several SQL queries and then return some
aggregate result by merging or subtracting the results of individual
queries.

Are there any ways I could optimize and increase the speed of the process
by, for example, running queries on already received dataframes rather than
the whole database?

Is there a better way to interactively query the Parquet stored data and
give results?

Thank you!



Narek Galstyan

Նարեկ Գալստյան


Re: Weird worker usage

2015-09-28 Thread Bryan Jeffrey
Nukunj,

No, I'm not calling set w/ master at all.  This ended up being a foolish
configuration problem with my slaves file.

Regards,

Bryan Jeffrey

On Fri, Sep 25, 2015 at 11:20 PM, N B  wrote:

> Bryan,
>
> By any chance, are you calling SparkConf.setMaster("local[*]") inside your
> application code?
>
> Nikunj
>
> On Fri, Sep 25, 2015 at 9:56 AM, Bryan Jeffrey 
> wrote:
>
>> Looking at this further, it appears that my Spark Context is not
>> correctly setting the Master name.  I see the following in logs:
>>
>> 15/09/25 16:45:42 INFO DriverRunner: Launch Command:
>> "/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp"
>> "/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar"
>> "-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING"
>> "-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "-
>> Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077"
>> "-Dspark.driver.supervise=true" "-Dspark.logConf=true"
>> "-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
>> "-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m"
>> "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://
>> sparkWorker@10.0.0.6:48077/user/Worker"
>> "/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
>> "MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker"
>> "kafkaBroker:9092" "--topic" "test" "--numStreams" "9"
>> "--threadParallelism" "9"
>> 15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root
>> 15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root
>> 15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(root); users
>> with modify permissions: Set(root)
>> 15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started
>> 15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on
>> port 59670.
>> 15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp://
>> sparkWorker@10.0.0.6:48077/user/Worker
>> 15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger
>> 15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to
>> akka.tcp://sparkWorker@10.0.0.6:48077/user/Worker
>> 15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory
>> /tmp/sparkcheckpoint does not exist
>> 15/09/25 16:45:45 INFO MainClass: Setting up streaming context with
>> configuration: org.apache.spark.SparkConf@56057cbf and time window 2000
>> ms
>> 15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1
>> 15/09/25 16:45:45 INFO SparkContext: Spark configuration:
>> spark.app.name=MainClass
>> spark.default.parallelism=6
>> spark.driver.supervise=true
>> spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar
>> spark.logConf=true
>> spark.master=local[*]
>> spark.rpc.askTimeout=10
>> spark.streaming.receiver.maxRate=500
>>
>> As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming
>> context still registers the master as local[*].  Any idea why?
>>
>> Thank you,
>>
>> Bryan Jeffrey
>>
>>
>>
>


Re: Spark Streaming Log4j Inside Eclipse

2015-09-28 Thread Adrian Tanase
You also need to provide it as parameter to spark submit
http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver

From: Ashish Soni
Date: Monday, September 28, 2015 at 5:18 PM
To: user
Subject: Spark Streaming Log4j Inside Eclipse

I need to turn off the verbose logging of Spark Streaming Code when i am 
running inside eclipse i tried creating a log4j.properties file and placed 
inside /src/main/resources but i do not see it getting any effect , Please help 
as not sure what else needs to be done to change the log at DEBUG or WARN


Re: Spark Streaming Log4j Inside Eclipse

2015-09-28 Thread Shixiong Zhu
You can use JavaSparkContext.setLogLevel to set the log level in your codes.

Best Regards,
Shixiong Zhu

2015-09-28 22:55 GMT+08:00 Ashish Soni :

> I am not running it using spark submit , i am running locally inside
> Eclipse IDE , how i set this using JAVA Code
>
> Ashish
>
> On Mon, Sep 28, 2015 at 10:42 AM, Adrian Tanase  wrote:
>
>> You also need to provide it as parameter to spark submit
>>
>> http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver
>>
>> From: Ashish Soni
>> Date: Monday, September 28, 2015 at 5:18 PM
>> To: user
>> Subject: Spark Streaming Log4j Inside Eclipse
>>
>> I need to turn off the verbose logging of Spark Streaming Code when i am
>> running inside eclipse i tried creating a log4j.properties file and placed
>> inside /src/main/resources but i do not see it getting any effect , Please
>> help as not sure what else needs to be done to change the log at DEBUG or
>> WARN
>>
>
>


Re: how to handle OOMError from groupByKey

2015-09-28 Thread Fabien Martin
You can try to reduce the number of containers in order to increase their
memory.

2015-09-28 9:35 GMT+02:00 Akhil Das :

> You can try to increase the number of partitions to get ride of the OOM
> errors. Also try to use reduceByKey instead of groupByKey.
>
> Thanks
> Best Regards
>
> On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran 
> wrote:
>
>> Hi everyone,
>> I have an RDD of the format (user: String, timestamp: Long, state:
>> Boolean).  My task invovles converting the states, where on/off is
>> represented as true/false, into intervals of 'on' of the format (beginTs:
>> Long, endTs: Long).  So this task requires me, per user, to line up all of
>> the on/off states so that I can compute when it is on, since the
>> calculation is neither associative nor commutative.
>>
>> So there are 2 main operations that I'm trying to accomplish together:
>> 1. group by each user
>> 2. sort by time -- keep all of the states in sorted order by time
>>
>> The main code inside the method that does grouping by user and sorting by
>> time looks sort of looks like this:
>>
>>
>> // RDD starts off in format (user, ts, state) of type RDD[(String, Long,
>> Boolean)]
>> val grouped = keyedStatesRDD.groupByKey
>> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of
>> type RDD[(String, Iterable(Long, Boolean))]
>> // take the sequence of (ts, state) per user, sort, get intervals
>> val groupedIntervals = grouped.mapValues(
>>   states => {
>> val sortedStates = states.toSeq.sortBy(_._1)
>> val intervals = DFUtil.statesToIntervals(sortedStates)
>> val intervalsList = bucketDurations.map{case(k,v) =>
>> (k,v)}(collection.breakOut).sortBy(_._1)
>> intervalsList
>>   }
>> )
>> // after .mapValues, new format for RDD is (user, seq-of-(startTime,
>> endTime)) of type RDD[(String, IndexedSeq(Long, Long))]
>>
>>
>> When I run my Spark job with 1 day's worth of data, the job completes
>> successfully.  When I run with 1 month's or 1 year's worth of data, that
>> method is where my Spark job consistently crashes with get
>> OutOfMemoryErrors.  I need to run on the full year's worth of data.
>>
>> My suspicion is that the groupByKey is the problem (it's pulling all of
>> the matching data values into a single executor's heap as a plain Scala
>> Iterable).  But alternatives of doing sortByKey on the RDD first before
>> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
>> quite apply in my scenario because my operation is not associative (can't
>> combine per-partition results) and I still need to group by users before
>> doing a foldLeft.
>>
>> I've definitely thought about the issue before and come across users with
>> issues that are similar but not exactly the same:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>
>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html
>>
>> And this Jira seems relevant too:
>> https://issues.apache.org/jira/browse/SPARK-3655
>>
>> The amount of memory that I'm using is 2g per executor, and I can't go
>> higher than that because each executor gets a YARN container from nodes
>> with 16 GB of RAM and 5 YARN containers allowed per node.
>>
>> So I'd like to know if there's an easy solution to executing my logic on
>> my full dataset in Spark.
>>
>> Thanks!
>>
>> -- Elango
>>
>
>


Spark Streaming Log4j Inside Eclipse

2015-09-28 Thread Ashish Soni
Hi All ,

I need to turn off the verbose logging of Spark Streaming Code when i am
running inside eclipse i tried creating a log4j.properties file and placed
inside /src/main/resources but i do not see it getting any effect , Please
help as not sure what else needs to be done to change the log at DEBUG or
WARN

Ashish


Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-09-28 Thread Rick Moritz
I've finally been able to pick this up again, after upgrading to Spark
1.4.1, because my code used the HiveContext, which runs fine in the REPL
(be it via Zeppelin or the shell) but won't work with spark-submit.
With 1.4.1, I hav actually managed to get a result with the Spark shell,
but after
3847,802237 seconds and in particular the last stage took 1320,672 seconds.
This was after I used coalesce to balance the workload initiall, since a
Hive filter I applied normally would make for a skewed distribution of the
data onto the nodes.
Nonetheless, the same code (even withouth the coalesce) would work much
faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit
job, the run time was just a tenth at
446,657534 seconds for the entire job and notably 38,961 seconds for the
final stage.

Again, there is a huge difference in the amount of data that gets
shuffled/spilled (which leads to much earlier OOM-conditions), when using
spark-shell.
What could be the reason for this different behaviour using very similar
configurations and identical data, machines and code (identical DAGs and
sources) and identical spark binaries? Why would code launched from
spark-shell generate more shuffled data for the same number of shuffled
tuples?

An analysis would be much appreciated.

Best,

Rick

On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz  wrote:

> oops, forgot to reply-all on this thread.
>
> -- Forwarded message --
> From: Rick Moritz 
> Date: Wed, Aug 19, 2015 at 2:46 PM
> Subject: Re: Strange shuffle behaviour difference between Zeppelin and
> Spark-shell
> To: Igor Berman 
>
>
> Those values are not explicitely set, and attempting to read their values
> results in 'java.util.NoSuchElementException: spark.shuffle.spill.compress'.
> What I mean by the volume per element being larger is illustrated in my
> original post: for each case the number of elements is identical, but the
> volume of data required to obtain/manage these elements is many times
> greater.
>
> The only difference used to be that Zeppelin had FAIR scheduling over FIFO
> scheduling for spark-shell. I just verified that spark-shell with FAIR
> scheduling makes no difference. The only other difference in the
> environment lies in some class-path variables which should only affect
> method availability, not actual usage.
>
> Another fact to note: Spark assembly (1.4.0-rc4) was built with provided
> hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0
> -Phadoop-provided -Phive -Phive-thriftserver -Psparkr -DskipTests clean
> package) for 2.6.0 from Hortonworks, while Zeppelin was built with
> dependencies against 2.6.0 from Maven central.
>
> On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman 
> wrote:
>
>> so what your case for version differences?
>> what do u mean by  "in spark-shell the volume per element is much larger"
>> can you verify that configuration in spark ui (under Environment tab is
>> same).
>> if you suspect compression than check following properties:
>> spark.shuffle.compress
>> spark.shuffle.spill.compress
>> spark.io.compression.codec
>> spark.rdd.compress
>>
>>
>>
>> On 19 August 2015 at 15:03, Rick Moritz  wrote:
>>
>>> Number of partitions and even size look relatively similar - except in
>>> spark-shell the volume per element is much larger, especially in later
>>> stages. That's when shuffles start to spill. Zeppelin creates almost no
>>> spills at all. The number of elements per partition are the same for both
>>> setups, but with very different data volume in/out. Almost as though
>>> compression was used in one case, and not in another, or as though
>>> shuffling is somehow less specific, and more nodes get data that they
>>> ultimately don't process at all. The same shuffling algorithm appears to be
>>> at work in each case, if the partitioning of the number of elements is
>>> anything to go by.
>>>
>>> On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman 
>>> wrote:
>>>
 i would compare spark ui metrics for both cases and see any
 differences(number of partitions, number of spills etc)
 why can't you make repl to be consistent with zepellin spark version?
  might be rc has issues...




 On 19 August 2015 at 14:42, Rick Moritz  wrote:

> No, the setup is one driver with 32g of memory, and three executors
> each with 8g of memory in both cases. No core-number has been specified,
> thus it should default to single-core (though I've seen the yarn-owned 
> jvms
> wrapping the executors take up to 3 cores in top). That is, unless, as I
> suggested, there are different defaults for the two means of job 
> submission
> that come into play in a non-transparent fashion (i.e. not visible in
> SparkConf).
>
> On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman 

ML Pipeline

2015-09-28 Thread Yasemin Kaya
Hi,

I am using Spar 1.5 and ML Pipeline. I create the model then give the model
unlabeled data to find the probabilites and predictions. When I want to see
the results, it returns me error.

//creating model
final PipelineModel model = pipeline.fit(trainingData);

JavaRDD rowRDD1 = unlabeledTest
.map(new Function, Row>() {

@Override
public Row call(Tuple2 arg0)
throws Exception {
return RowFactory.create(arg0._1(), arg0._2());
}
});
// creating dataframe from row
DataFrame production = sqlContext.createDataFrame(rowRDD1,
new StructType(new StructField[] {
new StructField("id", DataTypes.StringType, false,
Metadata.empty()),
new StructField("features", (new VectorUDT()), false,
Metadata.empty()) }));

DataFrame predictionsProduction = model.transform(production);
*//produces the error*
*predictionsProduction.select("id","features","probability").show(5);*

Here is my code, am I wrong at creating rowRDD1 or production ?
error : java.util.NoSuchElementException: key not found: 1.0
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
.

How can I solve it ? Thanks.

Have a nice day,
yasemin

-- 
hiç ender hiç


laziness in textFile reading from HDFS?

2015-09-28 Thread davidkl
Hello,

I need to process a significant amount of data every day, about 4TB. This
will be processed in batches of about 140GB. The cluster this will be
running on doesn't have enough memory to hold the dataset at once, so I am
trying to understand how this works internally.

When using textFile to read an HDFS folder (containing multiple files), I
understand that the number of partitions created are equal to the number of
HDFS blocks, correct? Are those created in a lazy way? I mean, if the number
of blocks/partitions is larger than the number of cores/threads the Spark
driver was launched with (N), are N partitions created initially and then
the rest when required? Or are all those partitions created up front?

I want to avoid reading the whole data into memory just to spill it out to
disk if there is no enough memory.

Thanks! 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textFile-reading-from-HDFS-tp24837.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HDFS is undefined

2015-09-28 Thread Akhil Das
For some reason Spark isnt picking up your hadoop confs, Did you download
spark compiled with the hadoop version that you are having in the cluster?

Thanks
Best Regards

On Fri, Sep 25, 2015 at 7:43 PM, Angel Angel 
wrote:

> hello,
> I am running the spark application.
>
> I have installed the cloudera manager.
> it includes the spark version 1.2.0
>
>
> But now i want to use spark version 1.4.0.
>
> its also working fine.
>
> But when i try to access the HDFS in spark 1.4.0 in eclipse i am getting
> the following error.
>
> "Exception in thread "main" java.nio.file.FileSystemNotFoundException:
> Provider "hdfs" not installed "
>
>
> My spark 1.4.0 spark-env.sh file is
>
> export HADOOP_CONF_DIR=/etc/hadoop/conf
> export SPARK_HOME=/root/spark-1.4.0
>
>
> export
> DEFAULT_HADOOP_HOME=/opt/cloudera/parcels/CDH-5.3.5-1.cdh5.3.5.p0.4/lib/hadoop
>
> still i am getting the error.
>
> please give me suggestions.
>
> Thanking You,
> Sagar Jadhav.
>


Re: how to handle OOMError from groupByKey

2015-09-28 Thread Akhil Das
You can try to increase the number of partitions to get ride of the OOM
errors. Also try to use reduceByKey instead of groupByKey.

Thanks
Best Regards

On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran 
wrote:

> Hi everyone,
> I have an RDD of the format (user: String, timestamp: Long, state:
> Boolean).  My task invovles converting the states, where on/off is
> represented as true/false, into intervals of 'on' of the format (beginTs:
> Long, endTs: Long).  So this task requires me, per user, to line up all of
> the on/off states so that I can compute when it is on, since the
> calculation is neither associative nor commutative.
>
> So there are 2 main operations that I'm trying to accomplish together:
> 1. group by each user
> 2. sort by time -- keep all of the states in sorted order by time
>
> The main code inside the method that does grouping by user and sorting by
> time looks sort of looks like this:
>
>
> // RDD starts off in format (user, ts, state) of type RDD[(String, Long,
> Boolean)]
> val grouped = keyedStatesRDD.groupByKey
> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of type
> RDD[(String, Iterable(Long, Boolean))]
> // take the sequence of (ts, state) per user, sort, get intervals
> val groupedIntervals = grouped.mapValues(
>   states => {
> val sortedStates = states.toSeq.sortBy(_._1)
> val intervals = DFUtil.statesToIntervals(sortedStates)
> val intervalsList = bucketDurations.map{case(k,v) =>
> (k,v)}(collection.breakOut).sortBy(_._1)
> intervalsList
>   }
> )
> // after .mapValues, new format for RDD is (user, seq-of-(startTime,
> endTime)) of type RDD[(String, IndexedSeq(Long, Long))]
>
>
> When I run my Spark job with 1 day's worth of data, the job completes
> successfully.  When I run with 1 month's or 1 year's worth of data, that
> method is where my Spark job consistently crashes with get
> OutOfMemoryErrors.  I need to run on the full year's worth of data.
>
> My suspicion is that the groupByKey is the problem (it's pulling all of
> the matching data values into a single executor's heap as a plain Scala
> Iterable).  But alternatives of doing sortByKey on the RDD first before
> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
> quite apply in my scenario because my operation is not associative (can't
> combine per-partition results) and I still need to group by users before
> doing a foldLeft.
>
> I've definitely thought about the issue before and come across users with
> issues that are similar but not exactly the same:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html
>
> And this Jira seems relevant too:
> https://issues.apache.org/jira/browse/SPARK-3655
>
> The amount of memory that I'm using is 2g per executor, and I can't go
> higher than that because each executor gets a YARN container from nodes
> with 16 GB of RAM and 5 YARN containers allowed per node.
>
> So I'd like to know if there's an easy solution to executing my logic on
> my full dataset in Spark.
>
> Thanks!
>
> -- Elango
>


"recommendProductsForUsers" makes worker node crash

2015-09-28 Thread wanbo
I have two workers to run the recommendation job. After spark v1.4.0, I want
to try the method "recommendProductsForUsers". This method makes my workers
node crash, and timeout to connect. If don't add new worker node.  What
should I do?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/recommendProductsForUsers-makes-worker-node-crash-tp24836.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HDFS is undefined

2015-09-28 Thread Ted Yu
Please post the question on vendor's forum. 

> On Sep 25, 2015, at 7:13 AM, Angel Angel  wrote:
> 
> hello,
> I am running the spark application.
> 
> I have installed the cloudera manager.
> it includes the spark version 1.2.0
> 
> 
> But now i want to use spark version 1.4.0.
> 
> its also working fine.
> 
> But when i try to access the HDFS in spark 1.4.0 in eclipse i am getting the 
> following error.
> 
> "Exception in thread "main" java.nio.file.FileSystemNotFoundException: 
> Provider "hdfs" not installed "
> 
> 
> My spark 1.4.0 spark-env.sh file is  
> 
> export HADOOP_CONF_DIR=/etc/hadoop/conf
> export SPARK_HOME=/root/spark-1.4.0
> 
> 
> export 
> DEFAULT_HADOOP_HOME=/opt/cloudera/parcels/CDH-5.3.5-1.cdh5.3.5.p0.4/lib/hadoop
> 
> still i am getting the error.
> 
> please give me suggestions.
> 
> Thanking You,
> Sagar Jadhav. 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Master getting down with Memory issue.

2015-09-28 Thread Saurav Sinha
Hi Spark Users,

I am running some spark jobs which is running every hour.After running for
12 hours master is getting killed giving exception as

*java.lang.OutOfMemoryError: GC overhead limit exceeded*

It look like there is some memory issue in spark master.
Spark Master is blocker. Any one please suggest me any thing.


Same kind of issue I noticed with spark history server.

In my job I have to monitor if job completed successfully, for that I am
hitting curl to get status but when no of jobs has increased to >80 apps
history server start responding with delay.Like it is taking more then 5
min to respond status of jobs.

Running spark 1.4.1 in standalone mode on 5 machine cluster.

Kindly suggest me solution for memory issue it is blocker.

Thanks,
Saurav Sinha

-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Re: Master getting down with Memory issue.

2015-09-28 Thread Akhil Das
Depends on the data volume that you are operating on.

Thanks
Best Regards

On Mon, Sep 28, 2015 at 5:12 PM, Saurav Sinha 
wrote:

> Hi Akhil,
>
> My job is creating 47 stages in one cycle and it is running every hour.
> Can you please suggest me what is optimum numbers of stages in spark job.
>
> How can we reduce numbers of stages in spark job.
>
> Thanks,
> Saurav Sinha
>
> On Mon, Sep 28, 2015 at 3:23 PM, Saurav Sinha 
> wrote:
>
>> Hi Akhil,
>>
>> Can you please explaine to me how increasing number of partition (which
>> is thing is worker nodes) will help.
>>
>> As issue is that my master is getting OOM.
>>
>> Thanks,
>> Saurav Sinha
>>
>> On Mon, Sep 28, 2015 at 2:32 PM, Akhil Das 
>> wrote:
>>
>>> This behavior totally depends on the job that you are doing. Usually
>>> increasing the # of partitions will sort out this issue. It would be good
>>> if you can paste the code snippet or explain what type of operations that
>>> you are doing.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Sep 28, 2015 at 11:37 AM, Saurav Sinha 
>>> wrote:
>>>
 Hi Spark Users,

 I am running some spark jobs which is running every hour.After running
 for 12 hours master is getting killed giving exception as

 *java.lang.OutOfMemoryError: GC overhead limit exceeded*

 It look like there is some memory issue in spark master.
 Spark Master is blocker. Any one please suggest me any thing.


 Same kind of issue I noticed with spark history server.

 In my job I have to monitor if job completed successfully, for that I
 am hitting curl to get status but when no of jobs has increased to >80 apps
 history server start responding with delay.Like it is taking more then 5
 min to respond status of jobs.

 Running spark 1.4.1 in standalone mode on 5 machine cluster.

 Kindly suggest me solution for memory issue it is blocker.

 Thanks,
 Saurav Sinha

 --
 Thanks and Regards,

 Saurav Sinha

 Contact: 9742879062

>>>
>>>
>>
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>
>
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>


log4j Spark-worker performance problem

2015-09-28 Thread vaibhavrtk
Hello

We need a lot of logging for our application about 1000 lines needed to be
logged per message we process and we process 1000 msgs/sec. So total lines
needed to be logged is /1000*1000/sec/. As it is going to be written in a
file. Will writing so much logs will impact the processing power of spark by
a lot?
If yes, What can be the alternative?

Note: This much logging is required for the appropriate monitoring of the
application.
Let me know if more information is needed.

Thanks
Vaibhav




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/log4j-Spark-worker-performance-problem-tp24842.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5.0 Not able to submit jobs using cluster URL

2015-09-28 Thread Akhil Das
Update the dependency version in your jobs build file, Also make sure you
have updated the spark version to 1.5.0 everywhere. (in the cluster, code)

Thanks
Best Regards

On Mon, Sep 28, 2015 at 11:29 AM, lokeshkumar  wrote:

> Hi forum
>
> I have just upgraded spark from 1.4.0 to 1.5.0 and am running my old
> (1.4.0)
> jobs on 1.5.0 using 'spark://ip:7077' cluster URL. But the job does not
> seem
> to start and errors out at server with below incompatible class exception
>
> 15/09/28 11:20:07 INFO Master: 10.0.0.195:34702 got disassociated,
> removing
> it.
> 15/09/28 11:20:07 WARN ReliableDeliverySupervisor: Association with remote
> system [akka.tcp://sparkDriver@10.0.0.195:34702] has failed, address is
> now
> gated for [5000] ms. Reason:
> [org.apache.spark.deploy.DeployMessages$RegisterApplication; local class
> incompatible: stream classdesc serialVersionUID = 352674063933172066, local
> class serialVersionUID = -5495080032843259921]
> 15/09/28 11:20:27 ERROR ErrorMonitor:
> org.apache.spark.deploy.DeployMessages$RegisterApplication; local class
> incompatible: stream classdesc serialVersionUID = 352674063933172066, local
> class serialVersionUID = -5495080032843259921
> java.io.InvalidClassException:
> org.apache.spark.deploy.DeployMessages$RegisterApplication; local class
> incompatible: stream classdesc serialVersionUID = 352674063933172066, local
> class serialVersionUID = -5495080032843259921
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
> at
>
> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
> at scala.util.Try$.apply(Try.scala:161)
> at
> akka.serialization.Serialization.deserialize(Serialization.scala:98)
> at
>
> akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63)
> at
>
> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
> at scala.util.Try$.apply(Try.scala:161)
> at
> akka.serialization.Serialization.deserialize(Serialization.scala:98)
> at
> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
> at
>
> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
> at
> akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
> at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
> at
>
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:935)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> The error at client is 'Association with remote system
> [akka.tcp://sparkMaster@lokesh-lt:7077] has failed, address is now gated
> for
> [5000] ms. Reason is: [Disassociated].'
>
> Please let me know if I am doing anything wrong,
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-0-Not-able-to-submit-jobs-using-cluster-URL-tp24835.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: using multiple dstreams together (spark streaming)

2015-09-28 Thread Archit Thakur
@TD: Doesn't transformWith need both of the DStreams to be of same
slideDuration.
[Spark Version: 1.3.1]



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-multiple-dstreams-together-spark-streaming-tp9947p24839.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Master getting down with Memory issue.

2015-09-28 Thread Saurav Sinha
Hi Akhil,

Can you please explaine to me how increasing number of partition (which is
thing is worker nodes) will help.

As issue is that my master is getting OOM.

Thanks,
Saurav Sinha

On Mon, Sep 28, 2015 at 2:32 PM, Akhil Das 
wrote:

> This behavior totally depends on the job that you are doing. Usually
> increasing the # of partitions will sort out this issue. It would be good
> if you can paste the code snippet or explain what type of operations that
> you are doing.
>
> Thanks
> Best Regards
>
> On Mon, Sep 28, 2015 at 11:37 AM, Saurav Sinha 
> wrote:
>
>> Hi Spark Users,
>>
>> I am running some spark jobs which is running every hour.After running
>> for 12 hours master is getting killed giving exception as
>>
>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>
>> It look like there is some memory issue in spark master.
>> Spark Master is blocker. Any one please suggest me any thing.
>>
>>
>> Same kind of issue I noticed with spark history server.
>>
>> In my job I have to monitor if job completed successfully, for that I am
>> hitting curl to get status but when no of jobs has increased to >80 apps
>> history server start responding with delay.Like it is taking more then 5
>> min to respond status of jobs.
>>
>> Running spark 1.4.1 in standalone mode on 5 machine cluster.
>>
>> Kindly suggest me solution for memory issue it is blocker.
>>
>> Thanks,
>> Saurav Sinha
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>
>


-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Re: Master getting down with Memory issue.

2015-09-28 Thread Akhil Das
This behavior totally depends on the job that you are doing. Usually
increasing the # of partitions will sort out this issue. It would be good
if you can paste the code snippet or explain what type of operations that
you are doing.

Thanks
Best Regards

On Mon, Sep 28, 2015 at 11:37 AM, Saurav Sinha 
wrote:

> Hi Spark Users,
>
> I am running some spark jobs which is running every hour.After running for
> 12 hours master is getting killed giving exception as
>
> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>
> It look like there is some memory issue in spark master.
> Spark Master is blocker. Any one please suggest me any thing.
>
>
> Same kind of issue I noticed with spark history server.
>
> In my job I have to monitor if job completed successfully, for that I am
> hitting curl to get status but when no of jobs has increased to >80 apps
> history server start responding with delay.Like it is taking more then 5
> min to respond status of jobs.
>
> Running spark 1.4.1 in standalone mode on 5 machine cluster.
>
> Kindly suggest me solution for memory issue it is blocker.
>
> Thanks,
> Saurav Sinha
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>


Re: Spark 1.5.0 Not able to submit jobs using cluster URL

2015-09-28 Thread Akhil Das
Well, for some reason your test is picking up the older jar then. Best way
to sort it out would be to create a build file for your project and add the
dependencies in the build file rather than you manually putting the jars.

Thanks
Best Regards

On Mon, Sep 28, 2015 at 2:44 PM, Lokesh Kumar Padhnavis 
wrote:

> Thanks Akhil for the reply,
>
> I am using ant and I placed the latest 1.5.0 jar file in my code and I am
> actually testing this on my laptop, so I have got only two places to change
> one in spark itself and the other in my code.
> And I did that.
>
> On Mon, Sep 28, 2015 at 2:30 PM Akhil Das 
> wrote:
>
>> Update the dependency version in your jobs build file, Also make sure you
>> have updated the spark version to 1.5.0 everywhere. (in the cluster, code)
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Sep 28, 2015 at 11:29 AM, lokeshkumar  wrote:
>>
>>> Hi forum
>>>
>>> I have just upgraded spark from 1.4.0 to 1.5.0 and am running my old
>>> (1.4.0)
>>> jobs on 1.5.0 using 'spark://ip:7077' cluster URL. But the job does not
>>> seem
>>> to start and errors out at server with below incompatible class exception
>>>
>>> 15/09/28 11:20:07 INFO Master: 10.0.0.195:34702 got disassociated,
>>> removing
>>> it.
>>> 15/09/28 11:20:07 WARN ReliableDeliverySupervisor: Association with
>>> remote
>>> system [akka.tcp://sparkDriver@10.0.0.195:34702] has failed, address is
>>> now
>>> gated for [5000] ms. Reason:
>>> [org.apache.spark.deploy.DeployMessages$RegisterApplication; local class
>>> incompatible: stream classdesc serialVersionUID = 352674063933172066,
>>> local
>>> class serialVersionUID = -5495080032843259921]
>>> 15/09/28 11:20:27 ERROR ErrorMonitor:
>>> org.apache.spark.deploy.DeployMessages$RegisterApplication; local class
>>> incompatible: stream classdesc serialVersionUID = 352674063933172066,
>>> local
>>> class serialVersionUID = -5495080032843259921
>>> java.io.InvalidClassException:
>>> org.apache.spark.deploy.DeployMessages$RegisterApplication; local class
>>> incompatible: stream classdesc serialVersionUID = 352674063933172066,
>>> local
>>> class serialVersionUID = -5495080032843259921
>>> at
>>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
>>> at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
>>> at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>> at
>>> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>> at
>>> akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
>>> at
>>>
>>> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
>>> at scala.util.Try$.apply(Try.scala:161)
>>> at
>>> akka.serialization.Serialization.deserialize(Serialization.scala:98)
>>> at
>>>
>>> akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63)
>>> at
>>>
>>> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
>>> at scala.util.Try$.apply(Try.scala:161)
>>> at
>>> akka.serialization.Serialization.deserialize(Serialization.scala:98)
>>> at
>>> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
>>> at
>>>
>>> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
>>> at
>>> akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
>>> at
>>> akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
>>> at
>>>
>>> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:935)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>> at
>>>
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>>
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>>
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>

Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian
I guess you're probably using Spark 1.5? Spark SQL does support schema 
merging, but we disabled it by default since 1.5 because it introduces 
extra performance costs (it's turned on by default in 1.4 and 1.3).


You may enable schema merging via either the Parquet data source 
specific option "mergeSchema":


  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng

On 9/28/15 3:45 PM, jordan.tho...@accenture.com wrote:


Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the sequence 
notation.  It doesn’t seem to have the desired effect.  Maybe I should 
say that each one of these files has a different schema.  When I use 
that call, I’m not ending up with a data frame with columns from all 
of the files taken together, but just one of them.  I’m tracing 
through the code trying to understand exactly what is happening with 
the Seq[String] call.  Maybe you know?  Is it trying to do some kind 
of schema merging?


Also, it seems that even if I could get it to work, it would require 
some parsing of the resulting schemas to find the invalid files.  I 
would like to capture these errors on read.


The parquet files  currently average about 60 MB in size, with min 
about 40 MB and max about 500 or so.  I could coalesce, but they do 
correspond to logical entities and there are a number of use-case 
specific reasons to keep them separate.


Thanks,

Jordan

*From:*Michael Armbrust [mailto:mich...@databricks.com]
*Sent:* Monday, September 28, 2015 4:02 PM
*To:* Thomas, Jordan 
*Cc:* user 
*Subject:* Re: Performance when iterating over many parquet files

Another note: for best performance you are going to want your parquet 
files to be pretty big (100s of mb).  You could coalesce them and 
write them out for more efficient repeat querying.


On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust 
> wrote:


sqlContext.read.parquet


takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() // this
works but using spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
>
wrote:

We are working with use cases where we need to do batch
processing on a large
number (hundreds of thousands) of Parquet files.  The
processing is quite
similar per file.  There are a many aggregates that are very
SQL-friendly
(computing averages, maxima, minima, aggregations on single
columns with
some selection criteria).  There are also some processing that
is more
advanced time-series processing (continuous wavelet transforms
and the
like).  This all seems like a good use case for Spark.

But I'm having performance problems.  Let's take a look at
something very
simple, which simply checks whether the parquet files are
readable.

Code that seems natural but doesn't work:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes =
parquetFiles.map(x => (x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x
=> x._1)

My understanding is that this doesn't work because sqlContext
can't be used
inside of a transformation like "map" (or inside an action). 
That it only

makes sense in the driver.  Thus, it becomes a null reference
in the above
code, so all reads fail.

Code that works:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes =
parquetFiles.collect().map(x =>
(x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x
=> x._1)


This works because the collect() means that everything happens
back on the
driver.  So the sqlContext object makes sense and everything
works fine.

But it is slow.  I'm using yarn-client mode on a 6-node
cluster with 17
executors, 40 GB ram on driver, 19GB on executors.  And it
takes about 1
minute to execute for 100 parquet files. Which is too long. 
Recall we need

to do this across hundreds of thousands of files.

I realize it is possible to parallelize after the read:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val 

RE: Performance when iterating over many parquet files

2015-09-28 Thread jordan.thomas
Ah, yes, I see that it has been turned off now, that’s why it wasn’t working.  
Thank you, this is helpful!  The problem now is to filter out bad (miswritten) 
Parquet files, as they are causing this operation to fail.

Any suggestions on detecting them quickly and easily?

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Monday, September 28, 2015 5:56 PM
To: Thomas, Jordan ; mich...@databricks.com
Cc: user@spark.apache.org
Subject: Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

- http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
- http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Cheng
On 9/28/15 3:54 PM, Cheng Lian wrote:
I guess you're probably using Spark 1.5? Spark SQL does support schema merging, 
but we disabled it by default since 1.5 because it introduces extra performance 
costs (it's turned on by default in 1.4 and 1.3).

You may enable schema merging via either the Parquet data source specific 
option "mergeSchema":

  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng
On 9/28/15 3:45 PM, 
jordan.tho...@accenture.com wrote:
Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the sequence notation.  
It doesn’t seem to have the desired effect.  Maybe I should say that each one 
of these files has a different schema.  When I use that call, I’m not ending up 
with a data frame with columns from all of the files taken together, but just 
one of them.  I’m tracing through the code trying to understand exactly what is 
happening with the Seq[String] call.  Maybe you know?  Is it trying to do some 
kind of schema merging?

Also, it seems that even if I could get it to work, it would require some 
parsing of the resulting schemas to find the invalid files.  I would like to 
capture these errors on read.

The parquet files  currently average about 60 MB in size, with min about 40 MB 
and max about 500 or so.  I could coalesce, but they do correspond to logical 
entities and there are a number of use-case specific reasons to keep them 
separate.

Thanks,
Jordan


From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Monday, September 28, 2015 4:02 PM
To: Thomas, Jordan 

Cc: user 
Subject: Re: Performance when iterating over many parquet files

Another note: for best performance you are going to want your parquet files to 
be pretty big (100s of mb).  You could coalesce them and write them out for 
more efficient repeat querying.

On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust 
> wrote:
sqlContext.read.parquet
 takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() // this works but using 
spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas 
> wrote:
We are working with use cases where we need to do batch processing on a large
number (hundreds of thousands) of Parquet files.  The processing is quite
similar per file.  There are a many aggregates that are very SQL-friendly
(computing averages, maxima, minima, aggregations on single columns with
some selection criteria).  There are also some processing that is more
advanced time-series processing (continuous wavelet transforms and the
like).  This all seems like a good use case for Spark.

But I'm having performance problems.  Let's take a look at something very
simple, which simply checks whether the parquet files are readable.

Code that seems natural but doesn't work:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)

My understanding is that this doesn't work because sqlContext can't be used
inside of a transformation like "map" (or inside an action).  That it only
makes sense in the driver.  Thus, it becomes a null reference in the above
code, so all reads fail.

Code that works:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x =>
(x, Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)


This works because the collect() means that everything happens back on the
driver.  So the sqlContext object makes sense and everything works fine.


Get variable into Spark's foreachRDD function

2015-09-28 Thread markluk
I have a streaming Spark process and I need to do some logging in the
`foreachRDD` function, but I'm having trouble accessing the logger as a
variable in the `foreachRDD` function

I would like to do the following

import logging

myLogger = logging.getLogger(LOGGER_NAME)
...
...
someData = 

someData.foreachRDD(lambda now, rdds : myLogger.info( ))

Inside the lambda, it cannot access `myLogger`. I get a giant stacktrace -
here is a snippet.


  File
"/juicero/press-mgmt/spark-1.5.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 537, in save_reduce
save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
  File
"/juicero/press-mgmt/spark-1.5.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 315, in save_builtin_function
return self.save_function(obj)
  File
"/juicero/press-mgmt/spark-1.5.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 191, in save_function
if islambda(obj) or obj.__code__.co_filename == '' or
themodule is None:
AttributeError: 'builtin_function_or_method' object has no attribute
'__code__'



I don't understand why I can't access `myLogger`. Does it have something to
do with Spark cannot serialize this logger object?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Get-variable-into-Spark-s-foreachRDD-function-tp24852.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Performance when iterating over many parquet files

2015-09-28 Thread jordan.thomas
Sure.  FI would just like to remove ones that fail the basic checks done by the 
Parquet readFooters function, in that their length is wrong or magic number is 
incorrect, which throws exceptions in the read method.

Errors like:

java.io.IOException: Could not read footer: java.lang.RuntimeException: 
data.parquet is not a Parquet file (too small)

and

java.io.IOException: Could not read footer: java.lang.RuntimeException: 
data.parquet is not a Parquet file. expected magic number at tail [80, 65, 82, 
49] but found [54, -4, -10, -102]

Backstory:  We had a migration from one cluster to another and thousands of 
incomplete files were transferred.  In addition, they are still trying to 
handle the kickouts from their write methods (they are converting from a 
proprietary binary format).  A lot of that is captured in the Splunk logs and 
will improve in the coming weeks as they continue tuning, but on the reading 
end I want to make sure we’re in sync about what needs to be re-converted and 
re-transferred.


Thanks,
Jordan

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Monday, September 28, 2015 6:15 PM
To: Thomas, Jordan ; mich...@databricks.com
Cc: user@spark.apache.org
Subject: Re: Performance when iterating over many parquet files

Could you please elaborate on what kind of errors are those bad Parquet files 
causing? In what ways are they miswritten?

Cheng
On 9/28/15 4:03 PM, 
jordan.tho...@accenture.com wrote:
Ah, yes, I see that it has been turned off now, that’s why it wasn’t working.  
Thank you, this is helpful!  The problem now is to filter out bad (miswritten) 
Parquet files, as they are causing this operation to fail.

Any suggestions on detecting them quickly and easily?

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Monday, September 28, 2015 5:56 PM
To: Thomas, Jordan 
; 
mich...@databricks.com
Cc: user@spark.apache.org
Subject: Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

- http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
- http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Cheng
On 9/28/15 3:54 PM, Cheng Lian wrote:
I guess you're probably using Spark 1.5? Spark SQL does support schema merging, 
but we disabled it by default since 1.5 because it introduces extra performance 
costs (it's turned on by default in 1.4 and 1.3).

You may enable schema merging via either the Parquet data source specific 
option "mergeSchema":

  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng
On 9/28/15 3:45 PM, 
jordan.tho...@accenture.com wrote:
Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the sequence notation.  
It doesn’t seem to have the desired effect.  Maybe I should say that each one 
of these files has a different schema.  When I use that call, I’m not ending up 
with a data frame with columns from all of the files taken together, but just 
one of them.  I’m tracing through the code trying to understand exactly what is 
happening with the Seq[String] call.  Maybe you know?  Is it trying to do some 
kind of schema merging?

Also, it seems that even if I could get it to work, it would require some 
parsing of the resulting schemas to find the invalid files.  I would like to 
capture these errors on read.

The parquet files  currently average about 60 MB in size, with min about 40 MB 
and max about 500 or so.  I could coalesce, but they do correspond to logical 
entities and there are a number of use-case specific reasons to keep them 
separate.

Thanks,
Jordan


From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Monday, September 28, 2015 4:02 PM
To: Thomas, Jordan 

Cc: user 
Subject: Re: Performance when iterating over many parquet files

Another note: for best performance you are going to want your parquet files to 
be pretty big (100s of mb).  You could coalesce them and write them out for 
more efficient repeat querying.

On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust 
> wrote:
sqlContext.read.parquet
 takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() // this works but using 
spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 

RE: java.lang.ClassCastException (org.apache.spark.scheduler.ResultTask cannot be cast to org.apache.spark.scheduler.Task)

2015-09-28 Thread Amit Yadav



Thank you Ted. I followed SPARK-8142 and removed the spark-core and hadoop jars 
from my application uber jar. I am able to get past through this error now. 
Amit.

Date: Mon, 28 Sep 2015 14:11:18 -0700
Subject: Re: java.lang.ClassCastException 
(org.apache.spark.scheduler.ResultTask cannot be cast to 
org.apache.spark.scheduler.Task)
From: yuzhih...@gmail.com
To: amit...@hotmail.com
CC: user@spark.apache.org

Please see SPARK-8142
On Mon, Sep 28, 2015 at 1:45 PM, amitra123  wrote:
Hello All,



I am trying to write a very simply Spark Streaming example problem and I m

getting this exception. I am new to Spark and I am not quite sure why this

exception is thrown. Wondering if someone has any clues. Here is the

backtrace. I am running this on Spark 1.5.0.



org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in

stage 0.0 failed 4 times, most recent failure: Lost task 9.3 in stage 0.0

(TID 17, 112.XXX.XXX.XXX): java.lang.ClassCastException:

org.apache.spark.scheduler.ResultTask cannot be cast to

org.apache.spark.scheduler.Task

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)



Driver stacktrace:

at

org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)

at

org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)

at

org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)

at

scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at

org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)

at

org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)

at

org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)

at scala.Option.foreach(Option.scala:236)

at

org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)

at

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)

at

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)

at

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)

at

org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898)

at

org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896)

at

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

at

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)

at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896)

at

org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222)

at

org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47)

at

com.ay.spark.example.SparkWordCountStreaming.outputRdd(SparkWordCountStreaming.java:172)

at

com.ay.spark.example.SparkWordCountStreaming.lambda$start$b852b88$1(SparkWordCountStreaming.java:123)

at

com.ay.spark.example.SparkWordCountStreaming$$Lambda$9/1640832113.call(Unknown

Source)

at

org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)

at

org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)

at

org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)

at

org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)

at

org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)

at

org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)

at


Monitoring tools for spark streaming

2015-09-28 Thread Siva
Hi,

Could someone recommend the monitoring tools for spark streaming?

By extending StreamingListener we can dump the delay in processing of
batches and some alert messages.

But are there any Web UI tools where we can monitor failures, see delays in
processing, error messages and setup alerts etc.

Thanks


Does YARN start new executor in place of the failed one?

2015-09-28 Thread Alexander Pivovarov
Hello Everyone

I use Spark on YARN on EMR-4

The spark program which I run has several jobs/stages and run for about 10
hours
During the execution some executors might fail for some reason.
BUT I do not see that new executor are started in place of the failed ones

So, what I see in spark UI is that at the beginning of my program I have
100 executors but in 10 hours I see only 67 executors.

I remember that in Standalone mode Spark Worker starts new executor in
place of failed one automatically.

How to active the same behavior on YARN?

The only non-default YARN setting I use are the following:
yarn.nodemanager.pmem-check-enabled=false
yarn.nodemanager.vmem-check-enabled=false

Thank you
Alex


Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian

Also, you may find more details in the programming guide:

- 
http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
- 
http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration


Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:
I guess you're probably using Spark 1.5? Spark SQL does support schema 
merging, but we disabled it by default since 1.5 because it introduces 
extra performance costs (it's turned on by default in 1.4 and 1.3).


You may enable schema merging via either the Parquet data source 
specific option "mergeSchema":


  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng

On 9/28/15 3:45 PM, jordan.tho...@accenture.com wrote:


Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the sequence 
notation.  It doesn’t seem to have the desired effect.  Maybe I 
should say that each one of these files has a different schema.  When 
I use that call, I’m not ending up with a data frame with columns 
from all of the files taken together, but just one of them.  I’m 
tracing through the code trying to understand exactly what is 
happening with the Seq[String] call.  Maybe you know?  Is it trying 
to do some kind of schema merging?


Also, it seems that even if I could get it to work, it would require 
some parsing of the resulting schemas to find the invalid files.  I 
would like to capture these errors on read.


The parquet files  currently average about 60 MB in size, with min 
about 40 MB and max about 500 or so.  I could coalesce, but they do 
correspond to logical entities and there are a number of use-case 
specific reasons to keep them separate.


Thanks,

Jordan

*From:*Michael Armbrust [mailto:mich...@databricks.com]
*Sent:* Monday, September 28, 2015 4:02 PM
*To:* Thomas, Jordan 
*Cc:* user 
*Subject:* Re: Performance when iterating over many parquet files

Another note: for best performance you are going to want your parquet 
files to be pretty big (100s of mb).  You could coalesce them and 
write them out for more efficient repeat querying.


On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust 
> wrote:


sqlContext.read.parquet


takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() // this
works but using spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
> wrote:

We are working with use cases where we need to do batch
processing on a large
number (hundreds of thousands) of Parquet files.  The
processing is quite
similar per file.  There are a many aggregates that are very
SQL-friendly
(computing averages, maxima, minima, aggregations on single
columns with
some selection criteria).  There are also some processing
that is more
advanced time-series processing (continuous wavelet
transforms and the
like).  This all seems like a good use case for Spark.

But I'm having performance problems.  Let's take a look at
something very
simple, which simply checks whether the parquet files are
readable.

Code that seems natural but doesn't work:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes =
parquetFiles.map(x => (x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x =>
x._1)

My understanding is that this doesn't work because sqlContext
can't be used
inside of a transformation like "map" (or inside an action). 
That it only

makes sense in the driver.  Thus, it becomes a null reference
in the above
code, so all reads fail.

Code that works:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes =
parquetFiles.collect().map(x =>
(x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x =>
x._1)


This works because the collect() means that everything
happens back on the
driver.  So the sqlContext object makes sense and everything
works fine.

But it is slow.  I'm using yarn-client mode on a 6-node
cluster with 17
executors, 40 GB ram on driver, 19GB on executors.  And it
takes about 1
minute to execute for 100 parquet files. Which is too 

Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian
Could you please elaborate on what kind of errors are those bad Parquet 
files causing? In what ways are they miswritten?


Cheng

On 9/28/15 4:03 PM, jordan.tho...@accenture.com wrote:


Ah, yes, I see that it has been turned off now, that’s why it wasn’t 
working.  Thank you, this is helpful!  The problem now is to filter 
out bad (miswritten) Parquet files, as they are causing this operation 
to fail.


Any suggestions on detecting them quickly and easily?

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 5:56 PM
*To:* Thomas, Jordan ; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

- 
http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
- 
http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration


Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:

I guess you're probably using Spark 1.5? Spark SQL does support
schema merging, but we disabled it by default since 1.5 because it
introduces extra performance costs (it's turned on by default in
1.4 and 1.3).

You may enable schema merging via either the Parquet data source
specific option "mergeSchema":

  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng

On 9/28/15 3:45 PM, jordan.tho...@accenture.com
 wrote:

Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the
sequence notation.  It doesn’t seem to have the desired
effect.  Maybe I should say that each one of these files has a
different schema.  When I use that call, I’m not ending up
with a data frame with columns from all of the files taken
together, but just one of them.  I’m tracing through the code
trying to understand exactly what is happening with the
Seq[String] call. Maybe you know?  Is it trying to do some
kind of schema merging?

Also, it seems that even if I could get it to work, it would
require some parsing of the resulting schemas to find the
invalid files.  I would like to capture these errors on read.

The parquet files  currently average about 60 MB in size, with
min about 40 MB and max about 500 or so.  I could coalesce,
but they do correspond to logical entities and there are a
number of use-case specific reasons to keep them separate.

Thanks,

Jordan

*From:*Michael Armbrust [mailto:mich...@databricks.com]
*Sent:* Monday, September 28, 2015 4:02 PM
*To:* Thomas, Jordan 

*Cc:* user  
*Subject:* Re: Performance when iterating over many parquet files

Another note: for best performance you are going to want your
parquet files to be pretty big (100s of mb).  You could
coalesce them and write them out for more efficient repeat
querying.

On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust
> wrote:

sqlContext.read.parquet


takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() //
this works but using spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
> wrote:

We are working with use cases where we need to do
batch processing on a large
number (hundreds of thousands) of Parquet files.  The
processing is quite
similar per file.  There are a many aggregates that
are very SQL-friendly
(computing averages, maxima, minima, aggregations on
single columns with
some selection criteria).  There are also some
processing that is more
advanced time-series processing (continuous wavelet
transforms and the
like).  This all seems like a good use case for Spark.

But I'm having performance problems. Let's take a look
at something very
simple, which simply checks whether the parquet files
are 

Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian

Probably parquet-tools and the following shell script helps:

root="/path/to/your/data"

for f in `find $root -type f -name "*.parquet"`; do
  parquet-schema $f 2&>1 /dev/null
  if [ ! -z $? ]; then echo $f; fi
end

This should print out all non-Parquet files under $root. Please refer to 
this link to see how to build and install parquet-tools 
https://github.com/Parquet/parquet-mr/issues/321


Cheng

On 9/28/15 4:29 PM, jordan.tho...@accenture.com wrote:


Sure. FI would just like to remove ones that fail the basic checks 
done by the Parquet readFooters function, in that their length is 
wrong or magic number is incorrect, which throws exceptions in the 
read method.


Errors like:

java.io.IOException: Could not read footer: 
java.lang.RuntimeException: data.parquet is not a Parquet file (too small)


and

java.io.IOException: Could not read footer: 
java.lang.RuntimeException: data.parquet is not a Parquet file. 
expected magic number at tail [80, 65, 82, 49] but found [54, -4, -10, 
-102]


Backstory: We had a migration from one cluster to another and 
thousands of incomplete files were transferred.  In addition, they are 
still trying to handle the kickouts from their write methods (they are 
converting from a proprietary binary format).  A lot of that is 
captured in the Splunk logs and will improve in the coming weeks as 
they continue tuning, but on the reading end I want to make sure we’re 
in sync about what needs to be re-converted and re-transferred.


Thanks,

Jordan

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 6:15 PM
*To:* Thomas, Jordan ; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files

Could you please elaborate on what kind of errors are those bad 
Parquet files causing? In what ways are they miswritten?


Cheng

On 9/28/15 4:03 PM, jordan.tho...@accenture.com 
 wrote:


Ah, yes, I see that it has been turned off now, that’s why it
wasn’t working.  Thank you, this is helpful!  The problem now is
to filter out bad (miswritten) Parquet files, as they are causing
this operation to fail.

Any suggestions on detecting them quickly and easily?

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 5:56 PM
*To:* Thomas, Jordan 
; mich...@databricks.com

*Cc:* user@spark.apache.org 
*Subject:* Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

-

http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
-
http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:

I guess you're probably using Spark 1.5? Spark SQL does
support schema merging, but we disabled it by default since
1.5 because it introduces extra performance costs (it's turned
on by default in 1.4 and 1.3).

You may enable schema merging via either the Parquet data
source specific option "mergeSchema":

  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng

On 9/28/15 3:45 PM, jordan.tho...@accenture.com
 wrote:

Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try
the sequence notation.  It doesn’t seem to have the
desired effect.  Maybe I should say that each one of these
files has a different schema.  When I use that call, I’m
not ending up with a data frame with columns from all of
the files taken together, but just one of them.  I’m
tracing through the code trying to understand exactly what
is happening with the Seq[String] call.  Maybe you know? 
Is it trying to do some kind of schema merging?


Also, it seems that even if I could get it to work, it
would require some parsing of the resulting schemas to
find the invalid files.  I would like to capture these
errors on read.

The parquet files  currently average about 60 MB in size,
with min about 40 MB and max about 500 or so.  I could
coalesce, but they do correspond to logical entities and
there are a number of use-case specific reasons to keep
them separate.

Thanks,

Jordan

*From:*Michael Armbrust 

RE: Performance when iterating over many parquet files

2015-09-28 Thread jordan.thomas
Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the sequence notation.  
It doesn’t seem to have the desired effect.  Maybe I should say that each one 
of these files has a different schema.  When I use that call, I’m not ending up 
with a data frame with columns from all of the files taken together, but just 
one of them.  I’m tracing through the code trying to understand exactly what is 
happening with the Seq[String] call.  Maybe you know?  Is it trying to do some 
kind of schema merging?

Also, it seems that even if I could get it to work, it would require some 
parsing of the resulting schemas to find the invalid files.  I would like to 
capture these errors on read.

The parquet files  currently average about 60 MB in size, with min about 40 MB 
and max about 500 or so.  I could coalesce, but they do correspond to logical 
entities and there are a number of use-case specific reasons to keep them 
separate.

Thanks,
Jordan


From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Monday, September 28, 2015 4:02 PM
To: Thomas, Jordan 
Cc: user 
Subject: Re: Performance when iterating over many parquet files

Another note: for best performance you are going to want your parquet files to 
be pretty big (100s of mb).  You could coalesce them and write them out for 
more efficient repeat querying.

On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust 
> wrote:
sqlContext.read.parquet
 takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() // this works but using 
spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas 
> wrote:
We are working with use cases where we need to do batch processing on a large
number (hundreds of thousands) of Parquet files.  The processing is quite
similar per file.  There are a many aggregates that are very SQL-friendly
(computing averages, maxima, minima, aggregations on single columns with
some selection criteria).  There are also some processing that is more
advanced time-series processing (continuous wavelet transforms and the
like).  This all seems like a good use case for Spark.

But I'm having performance problems.  Let's take a look at something very
simple, which simply checks whether the parquet files are readable.

Code that seems natural but doesn't work:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)

My understanding is that this doesn't work because sqlContext can't be used
inside of a transformation like "map" (or inside an action).  That it only
makes sense in the driver.  Thus, it becomes a null reference in the above
code, so all reads fail.

Code that works:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x =>
(x, Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)


This works because the collect() means that everything happens back on the
driver.  So the sqlContext object makes sense and everything works fine.

But it is slow.  I'm using yarn-client mode on a 6-node cluster with 17
executors, 40 GB ram on driver, 19GB on executors.  And it takes about 1
minute to execute for 100 parquet files.  Which is too long.  Recall we need
to do this across hundreds of thousands of files.

I realize it is possible to parallelize after the read:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val intermediate_successes =
parquetFiles.collect().map(x => (x,
Try(sqlContext.read.parquet(x
val dist_successes = sc.parallelize(successes) val successes =
dist_successes.filter(_._2.isSuccess).map(x => x._1)


But this does not improve performance substantially.  It seems the
bottleneck is that the reads are happening sequentially.

Is there a better way to do this?

Thanks,
Jordan




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you 

nested collection object query

2015-09-28 Thread tridib
Hi Friends,
What is the right syntax to query on collection of nested object? I have a
following schema and SQL. But it does not return anything. Is the syntax
correct?

root
 |-- id: string (nullable = false)
 |-- employee: array (nullable = false)
 ||-- element: struct (containsNull = true)
 |||-- id: string (nullable = false)
 |||-- name: string (nullable = false)
 |||-- speciality: string (nullable = false)


select id from member where employee.name = 'employee0'

Uploaded a test if some one want to try it out. NestedObjectTest.java

  




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/nested-collection-object-query-tp24853.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Performance when iterating over many parquet files

2015-09-28 Thread jordan.thomas
Ok  thanks.  Actually we ran something very similar this weekend.  It works but 
is very slow.

The Spark method I included in my original post is about 5-6 times faster.  
Just wondering if there is something even faster than that.  I see this as 
being a recurring problem over the next few months.

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Monday, September 28, 2015 6:46 PM
To: Thomas, Jordan ; mich...@databricks.com
Cc: user@spark.apache.org
Subject: Re: Performance when iterating over many parquet files

Probably parquet-tools and the following shell script helps:

root="/path/to/your/data"

for f in `find $root -type f -name "*.parquet"`; do
  parquet-schema $f 2&>1 /dev/null
  if [ ! -z $? ]; then echo $f; fi
end

This should print out all non-Parquet files under $root. Please refer to this 
link to see how to build and install parquet-tools 
https://github.com/Parquet/parquet-mr/issues/321

Cheng
On 9/28/15 4:29 PM, 
jordan.tho...@accenture.com wrote:
Sure.  FI would just like to remove ones that fail the basic checks done by the 
Parquet readFooters function, in that their length is wrong or magic number is 
incorrect, which throws exceptions in the read method.

Errors like:

java.io.IOException: Could not read footer: java.lang.RuntimeException: 
data.parquet is not a Parquet file (too small)

and

java.io.IOException: Could not read footer: java.lang.RuntimeException: 
data.parquet is not a Parquet file. expected magic number at tail [80, 65, 82, 
49] but found [54, -4, -10, -102]

Backstory:  We had a migration from one cluster to another and thousands of 
incomplete files were transferred.  In addition, they are still trying to 
handle the kickouts from their write methods (they are converting from a 
proprietary binary format).  A lot of that is captured in the Splunk logs and 
will improve in the coming weeks as they continue tuning, but on the reading 
end I want to make sure we’re in sync about what needs to be re-converted and 
re-transferred.


Thanks,
Jordan

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Monday, September 28, 2015 6:15 PM
To: Thomas, Jordan 
; 
mich...@databricks.com
Cc: user@spark.apache.org
Subject: Re: Performance when iterating over many parquet files

Could you please elaborate on what kind of errors are those bad Parquet files 
causing? In what ways are they miswritten?

Cheng
On 9/28/15 4:03 PM, 
jordan.tho...@accenture.com wrote:
Ah, yes, I see that it has been turned off now, that’s why it wasn’t working.  
Thank you, this is helpful!  The problem now is to filter out bad (miswritten) 
Parquet files, as they are causing this operation to fail.

Any suggestions on detecting them quickly and easily?

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Monday, September 28, 2015 5:56 PM
To: Thomas, Jordan 
; 
mich...@databricks.com
Cc: user@spark.apache.org
Subject: Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

- http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
- http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Cheng
On 9/28/15 3:54 PM, Cheng Lian wrote:
I guess you're probably using Spark 1.5? Spark SQL does support schema merging, 
but we disabled it by default since 1.5 because it introduces extra performance 
costs (it's turned on by default in 1.4 and 1.3).

You may enable schema merging via either the Parquet data source specific 
option "mergeSchema":

  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng
On 9/28/15 3:45 PM, 
jordan.tho...@accenture.com wrote:
Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the sequence notation.  
It doesn’t seem to have the desired effect.  Maybe I should say that each one 
of these files has a different schema.  When I use that call, I’m not ending up 
with a data frame with columns from all of the files taken together, but just 
one of them.  I’m tracing through the code trying to understand exactly what is 
happening with the Seq[String] call.  Maybe you know?  Is it trying to do some 
kind of schema merging?

Also, it seems that even if I could get it to work, it would require some 
parsing of the resulting schemas to find the invalid files.  I would like to 
capture these errors on read.

The parquet files  currently average about 60 MB in size, with min about 40 MB 

Re: SQL queries in Spark / YARN

2015-09-28 Thread Robert Grandl
 Thanks Mark. Do you know how ? In Spark standalone mode I use beeline to 
submit SQL scripts. 

In Spark/YARN, the only way I can see this will work is using spark-submit. 
However as it looks, I need to encapsulate the SQL queries in a Scala file. Do 
you have other suggestions ?
Thanks,Robert



 On Monday, September 28, 2015 2:47 PM, Mark Hamstra 
 wrote:
   

 Yes.
On Mon, Sep 28, 2015 at 12:46 PM, Robert Grandl  
wrote:

Hi guys,
I was wondering if it's possible to submit SQL queries to Spark SQL, when Spark 
is running atop YARN instead of standalone mode. 

Thanks,Robert



   

Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian
Oh I see, then probably this one, basically the parallel Spark version 
of my last script, using ParquetFileReader:


import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.format.converter.ParquetMetadataConverter

val badFiles = sc.parallelize(paths).mapPartitions { iterator =>
  val conf = new Configuration()
  iterator.filter { path =>
Try(ParquetFileReader.readFooter(
  conf, path, ParquetMetadataConverter.SKIP_ROW_GROUPS)).isFailure
  }
}.collect()


Cheng

On 9/28/15 4:48 PM, jordan.tho...@accenture.com wrote:


Ok thanks.  Actually we ran something very similar this weekend.  It 
works but is very slow.


The Spark method I included in my original post is about 5-6 times 
faster.  Just wondering if there is something even faster than that.  
I see this as being a recurring problem over the next few months.


*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 6:46 PM
*To:* Thomas, Jordan ; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files

Probably parquet-tools and the following shell script helps:

root="/path/to/your/data"

for f in `find $root -type f -name "*.parquet"`; do
  parquet-schema $f 2&>1 /dev/null
  if [ ! -z $? ]; then echo $f; fi
end

This should print out all non-Parquet files under $root. Please refer 
to this link to see how to build and install parquet-tools 
https://github.com/Parquet/parquet-mr/issues/321


Cheng

On 9/28/15 4:29 PM, jordan.tho...@accenture.com 
 wrote:


Sure. FI would just like to remove ones that fail the basic checks
done by the Parquet readFooters function, in that their length is
wrong or magic number is incorrect, which throws exceptions in the
read method.

Errors like:

java.io.IOException: Could not read footer:
java.lang.RuntimeException: data.parquet is not a Parquet file
(too small)

and

java.io.IOException: Could not read footer:
java.lang.RuntimeException: data.parquet is not a Parquet file.
expected magic number at tail [80, 65, 82, 49] but found [54, -4,
-10, -102]

Backstory: We had a migration from one cluster to another and
thousands of incomplete files were transferred.  In addition, they
are still trying to handle the kickouts from their write methods
(they are converting from a proprietary binary format).  A lot of
that is captured in the Splunk logs and will improve in the coming
weeks as they continue tuning, but on the reading end I want to
make sure we’re in sync about what needs to be re-converted and
re-transferred.

Thanks,

Jordan

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 6:15 PM
*To:* Thomas, Jordan 
; mich...@databricks.com

*Cc:* user@spark.apache.org 
*Subject:* Re: Performance when iterating over many parquet files

Could you please elaborate on what kind of errors are those bad
Parquet files causing? In what ways are they miswritten?

Cheng

On 9/28/15 4:03 PM, jordan.tho...@accenture.com
 wrote:

Ah, yes, I see that it has been turned off now, that’s why it
wasn’t working.  Thank you, this is helpful!  The problem now
is to filter out bad (miswritten) Parquet files, as they are
causing this operation to fail.

Any suggestions on detecting them quickly and easily?

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 5:56 PM
*To:* Thomas, Jordan 
; mich...@databricks.com

*Cc:* user@spark.apache.org 
*Subject:* Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

-

http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
-

http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:

I guess you're probably using Spark 1.5? Spark SQL does
support schema merging, but we disabled it by default
since 1.5 because it introduces extra performance costs
(it's turned on by default in 1.4 and 1.3).

You may enable schema merging via either the Parquet data
source specific option "mergeSchema":

  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET 

Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian
Oh I see, then probably this one, basically the parallel Spark version 
of my last script, using ParquetFileReader:


import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.format.converter.ParquetMetadataConverter

val badFiles = sc.parallelize(paths).mapPartitions { iterator =>
  val conf = new Configuration()
  iterator.filter { path =>
Try(ParquetFileReader.readFooter(
  conf, path, ParquetMetadataConverter.SKIP_ROW_GROUPS)).isFailure
  }
}.collect()


Cheng

On 9/28/15 4:48 PM, jordan.tho...@accenture.com wrote:


Ok thanks.  Actually we ran something very similar this weekend.  It 
works but is very slow.


The Spark method I included in my original post is about 5-6 times 
faster.  Just wondering if there is something even faster than that.  
I see this as being a recurring problem over the next few months.


*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 6:46 PM
*To:* Thomas, Jordan ; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files

Probably parquet-tools and the following shell script helps:

root="/path/to/your/data"

for f in `find $root -type f -name "*.parquet"`; do
  parquet-schema $f 2&>1 /dev/null
  if [ ! -z $? ]; then echo $f; fi
end

This should print out all non-Parquet files under $root. Please refer 
to this link to see how to build and install parquet-tools 
https://github.com/Parquet/parquet-mr/issues/321


Cheng

On 9/28/15 4:29 PM, jordan.tho...@accenture.com 
 wrote:


Sure. FI would just like to remove ones that fail the basic checks
done by the Parquet readFooters function, in that their length is
wrong or magic number is incorrect, which throws exceptions in the
read method.

Errors like:

java.io.IOException: Could not read footer:
java.lang.RuntimeException: data.parquet is not a Parquet file
(too small)

and

java.io.IOException: Could not read footer:
java.lang.RuntimeException: data.parquet is not a Parquet file.
expected magic number at tail [80, 65, 82, 49] but found [54, -4,
-10, -102]

Backstory: We had a migration from one cluster to another and
thousands of incomplete files were transferred.  In addition, they
are still trying to handle the kickouts from their write methods
(they are converting from a proprietary binary format).  A lot of
that is captured in the Splunk logs and will improve in the coming
weeks as they continue tuning, but on the reading end I want to
make sure we’re in sync about what needs to be re-converted and
re-transferred.

Thanks,

Jordan

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 6:15 PM
*To:* Thomas, Jordan 
; mich...@databricks.com

*Cc:* user@spark.apache.org 
*Subject:* Re: Performance when iterating over many parquet files

Could you please elaborate on what kind of errors are those bad
Parquet files causing? In what ways are they miswritten?

Cheng

On 9/28/15 4:03 PM, jordan.tho...@accenture.com
 wrote:

Ah, yes, I see that it has been turned off now, that’s why it
wasn’t working.  Thank you, this is helpful!  The problem now
is to filter out bad (miswritten) Parquet files, as they are
causing this operation to fail.

Any suggestions on detecting them quickly and easily?

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 5:56 PM
*To:* Thomas, Jordan 
; mich...@databricks.com

*Cc:* user@spark.apache.org 
*Subject:* Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

-

http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
-

http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:

I guess you're probably using Spark 1.5? Spark SQL does
support schema merging, but we disabled it by default
since 1.5 because it introduces extra performance costs
(it's turned on by default in 1.4 and 1.3).

You may enable schema merging via either the Parquet data
source specific option "mergeSchema":

  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET 

Re: Monitoring tools for spark streaming

2015-09-28 Thread Shixiong Zhu
Which version are you using? Could you take a look at the new Streaming UI
in 1.4.0?

Best Regards,
Shixiong Zhu

2015-09-29 7:52 GMT+08:00 Siva :

> Hi,
>
> Could someone recommend the monitoring tools for spark streaming?
>
> By extending StreamingListener we can dump the delay in processing of
> batches and some alert messages.
>
> But are there any Web UI tools where we can monitor failures, see delays
> in processing, error messages and setup alerts etc.
>
> Thanks
>
>


spark-submit classloader issue...

2015-09-28 Thread Rachana Srivastava
Hello all,

Goal:  I want to use APIs from HttpClient library 4.4.1.  I am using maven 
shaded plugin to generate JAR.



Findings: When I run my program as a java application within eclipse everything 
works fine.  But when I am running the program using spark-submit I am getting 
following error:

URL content Could not initialize class 
org.apache.http.conn.ssl.SSLConnectionSocketFactory

java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.http.conn.ssl.SSLConnectionSocketFactory



When I tried to get the referred JAR it is pointing to some Hadoop JAR,  I am 
assuming this is something set in spark-submit.



ClassLoader classLoader = HttpEndPointClient.class.getClassLoader();

URL resource = 
classLoader.getResource("org/apache/http/message/BasicLineFormatter.class");

Prints following jar:

jar:file:/usr/lib/hadoop/lib/httpcore-4.2.5.jar!/org/apache/http/message/BasicLineFormatter.class



After research I found that I can override --conf 
spark.files.userClassPathFirst=true --conf spark.yarn.user.classpath.first=true



But when I do that I am getting following error:

ERROR: org.apache.spark.executor.Executor - Exception in task 0.0 in stage 0.0 
(TID 0)

java.io.InvalidClassException: org.apache.spark.scheduler.Task; local class 
incompatible: stream classdesc serialVersionUID = -4703555755588060120, local 
class serialVersionUID = -1589734467697262504

at 
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)

at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)

at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)

at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)

at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)

at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)

at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)



I am running on CDH 5.4  Here is my complete pom file.



http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

test

test

0.0.1-SNAPSHOT






org.apache.httpcomponents


httpcore

4.4.1






org.apache.httpcomponents


httpclient

4.4.1






org.apache.spark


spark-streaming-kafka_2.10

1.5.0





 httpcore

  org.apache.httpcomponents










org.apache.spark


spark-streaming_2.10

1.5.0





 httpcore

  org.apache.httpcomponents










org.apache.spark


spark-core_2.10

1.5.0





 httpcore

  org.apache.httpcomponents










org.apache.spark


spark-mllib_2.10

  

Re: Spark streaming job filling a lot of data in local spark nodes

2015-09-28 Thread Shixiong Zhu
These files are created by shuffle and just some temp files. They are not
necessary for checkpointing and only stored in your local temp directory.
They will be stored in "/tmp" by default. You can use `spark.local.dir` to
set the path if you find your "/tmp" doesn't have enough space.

Best Regards,
Shixiong Zhu

2015-09-29 1:04 GMT+08:00 swetha :

>
> Hi,
>
> I see a lot of data getting filled locally as shown below from my streaming
> job. I have my checkpoint set to hdfs. But, I still see the following data
> filling my local nodes. Any idea if I can make this stored in hdfs instead
> of storing the data locally?
>
> -rw-r--r--  1520 Sep 17 18:43 shuffle_23119_5_0.index
> -rw-r--r--  1 180564255 Sep 17 18:43 shuffle_23129_2_0.data
> -rw-r--r--  1 364850277 Sep 17 18:45 shuffle_23145_8_0.data
> -rw-r--r--  1  267583750 Sep 17 18:46 shuffle_23105_4_0.data
> -rw-r--r--  1  136178819 Sep 17 18:48 shuffle_23123_8_0.data
> -rw-r--r--  1  159931184 Sep 17 18:48 shuffle_23167_8_0.data
> -rw-r--r--  1520 Sep 17 18:49 shuffle_23315_7_0.index
> -rw-r--r--  1520 Sep 17 18:50 shuffle_23319_3_0.index
> -rw-r--r--  1   92240350 Sep 17 18:51 shuffle_23305_2_0.data
> -rw-r--r--  1   40380158 Sep 17 18:51 shuffle_23323_6_0.data
> -rw-r--r--  1  369653284 Sep 17 18:52 shuffle_23103_6_0.data
> -rw-r--r--  1  371932812 Sep 17 18:52 shuffle_23125_6_0.data
> -rw-r--r--  1   19857974 Sep 17 18:53 shuffle_23291_19_0.data
> -rw-r--r--  1  55342005 Sep 17 18:53 shuffle_23305_8_0.data
> -rw-r--r--  1   92920590 Sep 17 18:53 shuffle_23303_4_0.data
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-job-filling-a-lot-of-data-in-local-spark-nodes-tp24846.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Is MLBase dead?

2015-09-28 Thread Justin Pihony
As in, is MLBase (MLO/MLI/MLlib) now simply org.apache.spark.mllib and
org.apache.spark.ml? I cannot find anything official, and the last updates
seem to be a year or two old.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-MLBase-dead-tp24854.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Monitoring tools for spark streaming

2015-09-28 Thread Hari Shreedharan
+1. The Streaming UI should give you more than enough information.




Thanks, Hari

On Mon, Sep 28, 2015 at 9:55 PM, Shixiong Zhu  wrote:

> Which version are you using? Could you take a look at the new Streaming UI
> in 1.4.0?
> Best Regards,
> Shixiong Zhu
> 2015-09-29 7:52 GMT+08:00 Siva :
>> Hi,
>>
>> Could someone recommend the monitoring tools for spark streaming?
>>
>> By extending StreamingListener we can dump the delay in processing of
>> batches and some alert messages.
>>
>> But are there any Web UI tools where we can monitor failures, see delays
>> in processing, error messages and setup alerts etc.
>>
>> Thanks
>>
>>

Re: spark.streaming.concurrentJobs

2015-09-28 Thread Shixiong Zhu
"writing to HBase for each partition in the RDDs from a given DStream is an
independent output operation"

This is not correct. "writing to HBase for each partition in the RDDs from
a given DStream" is just a task. And they already run in parallel.

The output operation is the DStream action, such as count, saveXXX, take.

For example, if "spark.streaming.concurrentJobs" is 1, and you call
DStream.count() twice. There will be two "count" Spark jobs and they will
run one by one. But if you set "spark.streaming.concurrentJobs" to 2, these
two "count" Spark jobs will run in parallel.

Moreover, "spark.streaming.concurrentJobs" is an internal configuration and
it may be changed in future.


Best Regards,
Shixiong Zhu

2015-09-26 3:34 GMT+08:00 Atul Kulkarni :

> Can someone please help either by explaining or pointing to documentation
> the relationship between #executors needed and How to let the concurrent
> jobs that are created by the above parameter run in parallel?
>
> On Thu, Sep 24, 2015 at 11:56 PM, Atul Kulkarni 
> wrote:
>
>> Hi Folks,
>>
>> I am trying to speed up my spark streaming job, I found a presentation by
>> Tathagata Das that mentions to increase value of
>> "spark.streaming.concurrentJobs" if I have more than one output.
>>
>> In my spark streaming job I am reading from Kafka using Receiver Bases
>> approach and transforming each line of data from Kafka and storing to
>> HBase. I do not intend to do any kind of collation at this stage. I believe
>> this can be parallelized by creating a separate job to write a different
>> set of lines from Kafka to HBase and hence, I set the above parameter to a
>> value > 1. Is my above assumption that writing to HBase for each partition
>> in the RDDs from a given DStream is an independent output operation and can
>> be parallelized?
>>
>> If the assumption is correct, and I run the job - this job creates
>> multiple (smaller) jobs but they are executed one after another, not in
>> parallel - I am curious if there is a requirement that #Executors be >= a
>> particular number (a calculation based on how many repartitions after unio
>> od DSreams etc. - I don't know I am grasping at Straws here.)
>>
>> I would appreciate some help in this regard. Thanks in advance.
>>
>> --
>> Regards,
>> Atul Kulkarni
>>
>
>
>
> --
> Regards,
> Atul Kulkarni
>


Re: Get variable into Spark's foreachRDD function

2015-09-28 Thread Ankur Srivastava
Hi,

You are creating a logger instance on driver and then trying to use that
instance in a transformation function which is executed on the executor.

You should create logger instance in the transformation function itself but
then the logs will go to separate files on each worker node.

Hope this helps.

Thanks
Ankur

On Mon, Sep 28, 2015 at 4:06 PM, markluk  wrote:

> I have a streaming Spark process and I need to do some logging in the
> `foreachRDD` function, but I'm having trouble accessing the logger as a
> variable in the `foreachRDD` function
>
> I would like to do the following
>
> import logging
>
> myLogger = logging.getLogger(LOGGER_NAME)
> ...
> ...
> someData = 
>
> someData.foreachRDD(lambda now, rdds : myLogger.info(  RDD>))
>
> Inside the lambda, it cannot access `myLogger`. I get a giant stacktrace -
> here is a snippet.
>
>
>   File
>
> "/juicero/press-mgmt/spark-1.5.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 537, in save_reduce
> save(state)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
> save(element)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
> self._batch_setitems(obj.iteritems())
>   File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
> save(v)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
>   File
>
> "/juicero/press-mgmt/spark-1.5.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 315, in save_builtin_function
> return self.save_function(obj)
>   File
>
> "/juicero/press-mgmt/spark-1.5.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 191, in save_function
> if islambda(obj) or obj.__code__.co_filename == '' or
> themodule is None:
> AttributeError: 'builtin_function_or_method' object has no attribute
> '__code__'
>
>
>
> I don't understand why I can't access `myLogger`. Does it have something to
> do with Spark cannot serialize this logger object?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Get-variable-into-Spark-s-foreachRDD-function-tp24852.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Merging two avro RDD/DataFrames

2015-09-28 Thread TEST ONE
I have a daily update of modified users (~100s) output as avro from ETL.
I’d need to find and merge with existing corresponding members in a master
avro file (~100,000s) The merge operation involves merging a ‘profiles’
Map between the matching records.


What would be the recommended pattern to handle record merging with Spark?


Thanks,

kc


Setting executors per worker - Standalone

2015-09-28 Thread James Pirz
Hi,

I am using speak 1.5 (standalone mode) on a cluster with 10 nodes while
each machine has 12GB of RAM and 4 cores. On each machine I have one worker
which is running one executor that grabs all 4 cores. I am interested to
check the performance with "one worker but 4 executors per machine - each
with one core".

I can see that "running multiple executors per worker in Standalone mode"
is possible based on the closed issue:

https://issues.apache.org/jira/browse/SPARK-1706

But I can not find a way to do that. "SPARK_EXECUTOR_INSTANCES" is only
available for the Yarn mode, and in the standalone mode I can just set
"SPARK_WORKER_INSTANCES" and "SPARK_WORKER_CORES" and "SPARK_WORKER_MEMORY".

Any hint or suggestion would be great.


RE: nested collection object query

2015-09-28 Thread java8964
Your employee in fact is an array of struct, not just struct.
If you are using HiveSQLContext, then you can refer it like following:
select id from member where employee[0].name = 'employee0'
The employee[0] is pointing to the 1st element of the array. 
If you want to query all the elements in the array, then you have to use 
"explode" in the Hive. 
See Hive document for this:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-explode
Yong

> Date: Mon, 28 Sep 2015 16:37:23 -0700
> From: tridib.sama...@live.com
> To: user@spark.apache.org
> Subject: nested collection object query
> 
> Hi Friends,
> What is the right syntax to query on collection of nested object? I have a
> following schema and SQL. But it does not return anything. Is the syntax
> correct?
> 
> root
>  |-- id: string (nullable = false)
>  |-- employee: array (nullable = false)
>  ||-- element: struct (containsNull = true)
>  |||-- id: string (nullable = false)
>  |||-- name: string (nullable = false)
>  |||-- speciality: string (nullable = false)
> 
> 
> select id from member where employee.name = 'employee0'
> 
> Uploaded a test if some one want to try it out. NestedObjectTest.java
> 
>   
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/nested-collection-object-query-tp24853.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

CassandraSQLContext throwing NullPointer Exception

2015-09-28 Thread Priya Ch
Hi All,

 I am trying to use dataframes (which contain data from cassandra) in
rdd.foreach. This is throwing the following exception:

Is CassandraSQLContext accessible within executor 

15/09/28 17:22:40 ERROR JobScheduler: Error running job streaming job
144344116 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage
6.0 (TID 83, blrrndnbudn05.rnd.amadeus.net): java.lang.NullPointerException
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:134)
at
org.apache.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:74)
at
org.apache.spark.sql.cassandra.CassandraSQLContext.sql(CassandraSQLContext.scala:77)
at
com.amadeus.spark.example.SparkDemo$.withDataFrames(SparkDemo.scala:170)
at
com.amadeus.spark.example.SparkDemo$$anonfun$main$1$$anonfun$apply$1.apply(SparkDemo.scala:158)
at
com.amadeus.spark.example.SparkDemo$$anonfun$main$1$$anonfun$apply$1.apply(SparkDemo.scala:158)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Re: CassandraSQLContext throwing NullPointer Exception

2015-09-28 Thread Ted Yu
Which Spark release are you using ?

Can you show the snippet of your code around CassandraSQLContext#sql() ?

Thanks

On Mon, Sep 28, 2015 at 6:21 AM, Priya Ch 
wrote:

> Hi All,
>
>  I am trying to use dataframes (which contain data from cassandra) in
> rdd.foreach. This is throwing the following exception:
>
> Is CassandraSQLContext accessible within executor 
>
> 15/09/28 17:22:40 ERROR JobScheduler: Error running job streaming job
> 144344116 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 6.0 (TID 83, blrrndnbudn05.rnd.amadeus.net):
> java.lang.NullPointerException
> at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:134)
> at
> org.apache.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:74)
> at
> org.apache.spark.sql.cassandra.CassandraSQLContext.sql(CassandraSQLContext.scala:77)
> at
> com.amadeus.spark.example.SparkDemo$.withDataFrames(SparkDemo.scala:170)
> at
> com.amadeus.spark.example.SparkDemo$$anonfun$main$1$$anonfun$apply$1.apply(SparkDemo.scala:158)
> at
> com.amadeus.spark.example.SparkDemo$$anonfun$main$1$$anonfun$apply$1.apply(SparkDemo.scala:158)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>


RE: Is this a Spark issue or Hive issue that Spark cannot read the string type data in the Parquet generated by Hive

2015-09-28 Thread java8964
Hi, Lian:
Thanks for the information. It works as expect in the spark with this setting.
Yong

Subject: Re: Is this a Spark issue or Hive issue that Spark cannot read the 
string type data in the Parquet generated by Hive
To: java8...@hotmail.com; user@spark.apache.org
From: lian.cs@gmail.com
Date: Fri, 25 Sep 2015 14:42:55 -0700


  

  
  
Please set the the SQL option spark.sql.parquet.binaryAsString
to true when reading Parquet files containing strings generated by
Hive.



This is actually a bug of parquet-hive. When generating Parquet
schema for a string field, Parquet requires a "UTF8" annotation,
something like:



message hive_schema {

  ...

  optional binary column2 (UTF8);

  ...

}



but parquet-hive fails to add it, and produces:



message hive_schema {

  
  ...

  
  optional binary column2;

  
  ...

  
}

  
  

  Thus binary fields and string fields are made indistinguishable. 

  

  Interestingly, there's another bug in parquet-thrift, which always
  adds UTF8 annotation to all binary fields :)

  

  Cheng

  

  On 9/25/15 2:03 PM, java8964 wrote:



  
  Hi, Spark Users:



I have a problem related to Spark cannot recognize the
  string type in the Parquet schema generated by Hive.



Version of all components:



Spark 1.3.1
Hive 0.12.0
Parquet 1.3.2



I generated a detail low level table in the Parquet format
  using MapReduce java code. This table can be read in the Hive
  and Spark without any issue.



Now I create a Hive aggregation table like following:



create external table T (
column1 bigint,
column2 string,
..
)
partitioned by (dt string)

  ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
  STORED AS
  INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
  OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"
  location '/hdfs_location'




Then the table is populated in the Hive by:




  set hive.exec.compress.output=true;
  set parquet.compression=snappy;




insert into table T partition(dt='2015-09-23')
select 
.
from Detail_Table
group by 



After this, we can query the T table in the Hive without
  issue.



But if I try to use it in the Spark 1.3.1 like following:



import org.apache.spark.sql.SQLContext
val sqlContext = new
  org.apache.spark.sql.hive.HiveContext(sc)
val
  v_event_cnt=sqlContext.parquetFile("/hdfs_location/dt=2015-09-23")




  scala> v_event_cnt.printSchema
  root
   |-- column1: long (nullable = true)
   |-- column2: binary (nullable =
true)
   |-- 
   |-- dt: string (nullable = true)




The Spark will recognize
column2 as binary type, instead of string type in this case,
but in the Hive, it works fine.
So this bring an issue that in the Spark, the data will be
  dumped as "[B@e353d68". To use it in the Spark, I have to cast
  it as string, to get the correct value out of it.



I wonder this mismatch type of Parquet file could be caused
  by which part? Is the Hive not generate the correct Parquet
  file with schema, or Spark in fact cannot recognize it due to problem 
in it. 


  
Is there a way I can do
either Hive or Spark to make this parquet schema correctly
on both ends?


  
Thanks


  
Yong
  


  

Re: Performance when iterating over many parquet files

2015-09-28 Thread Michael Armbrust
sqlContext.read.parquet

takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() // this works but
using spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas 
wrote:

> We are working with use cases where we need to do batch processing on a
> large
> number (hundreds of thousands) of Parquet files.  The processing is quite
> similar per file.  There are a many aggregates that are very SQL-friendly
> (computing averages, maxima, minima, aggregations on single columns with
> some selection criteria).  There are also some processing that is more
> advanced time-series processing (continuous wavelet transforms and the
> like).  This all seems like a good use case for Spark.
>
> But I'm having performance problems.  Let's take a look at something very
> simple, which simply checks whether the parquet files are readable.
>
> Code that seems natural but doesn't work:
>
> import scala.util.{Try, Success, Failure} val parquetFiles =
> sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
> Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)
>
> My understanding is that this doesn't work because sqlContext can't be used
> inside of a transformation like "map" (or inside an action).  That it only
> makes sense in the driver.  Thus, it becomes a null reference in the above
> code, so all reads fail.
>
> Code that works:
>
> import scala.util.{Try, Success, Failure} val parquetFiles =
> sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x
> =>
> (x, Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)
>
>
> This works because the collect() means that everything happens back on the
> driver.  So the sqlContext object makes sense and everything works fine.
>
> But it is slow.  I'm using yarn-client mode on a 6-node cluster with 17
> executors, 40 GB ram on driver, 19GB on executors.  And it takes about 1
> minute to execute for 100 parquet files.  Which is too long.  Recall we
> need
> to do this across hundreds of thousands of files.
>
> I realize it is possible to parallelize after the read:
>
> import scala.util.{Try, Success, Failure} val parquetFiles =
> sc.textFile("file_list.txt") val intermediate_successes =
> parquetFiles.collect().map(x => (x,
> Try(sqlContext.read.parquet(x
> val dist_successes = sc.parallelize(successes) val successes =
> dist_successes.filter(_._2.isSuccess).map(x => x._1)
>
>
> But this does not improve performance substantially.  It seems the
> bottleneck is that the reads are happening sequentially.
>
> Is there a better way to do this?
>
> Thanks,
> Jordan
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Is there any tool provides per-task monitoring to figure out task skew in Spark streaming?

2015-09-28 Thread 이기석
Hi this is a graduate student studying Spark streaming for research purpose.

I want to know whether there is a task skew in my streaming application.
But as far as I found out, the Spark UI does not provide any useful
information to figure this.

I found a related work from Spark Summit 2014:

*Sparkling: Identification of Task Skew and Speculative Partition of Data
for Spark applications*
(
https://spark-summit.org/2014/talk/sparkling-identification-of-task-skew-and-speculative-partition-of-data-for-spark-applications
)

However it does not seem to be opened for public use.
Is there any useful tool that I can use to find a task skew?


Thanks in advance.


SparkContext._active_spark_context returns None

2015-09-28 Thread YiZhi Liu
Hi,

I'm doing some data processing on pyspark, but I failed to reach JVM
in workers. Here is what I did:

$ bin/pyspark
>>> data = sc.parallelize(["123", "234"])
>>> numbers = data.map(lambda s: 
>>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf(s.strip()))
>>> numbers.collect()

I got,

Caused by: org.apache.spark.api.python.PythonException: Traceback
(most recent call last):
  File 
"/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py",
line 111, in main
process()
  File 
"/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py",
line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File 
"/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/serializers.py",
line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File "", line 1, in 
AttributeError: 'NoneType' object has no attribute '_jvm'

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more

While _jvm at the driver end looks fine:

>>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf("123".strip())
123

The program is trivial, I just wonder what is the right way to reach
JVM in python. Any help would be appreciated.

Thanks

-- 
Yizhi Liu
Senior Software Engineer / Data Mining
www.mvad.com, Shanghai, China

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL: Implementing Custom Data Source

2015-09-28 Thread Jerry Lam
Hi spark users and developers,

I'm trying to learn how implement a custom data source for Spark SQL. Is
there a documentation that I can use as a reference? I'm not sure exactly
what needs to be extended/implemented. A general workflow will be greatly
helpful!

Best Regards,

Jerry


Update cassandra rows problem

2015-09-28 Thread amine_901
Hello all,
i'm using spark 1.2 with spark cassandra connector 1.2.3, 
i'm trying to update somme rows of table: 

example: 
*CREATE TABLE myTable (
a text,
b text,
c text,
date timestamp,
d text,
e text static,
f text static,
PRIMARY KEY ((a, b, c), date, d)
) WITH CLUSTERING ORDER BY (date ASC, d ASC)*

*val interactions = sc.cassandraTable[(String, String, String, DateTime,
String, String)]("keySpace", "myTable").
select("a","b","c","date", "d", "e","f")
  val empty = interactions.filter(r => r._6 == null).cache()
  empty.count()*

I just count the number of rows containing null for "e" and the remplace
them by the value of "b"

* val update_inter = empty.map( r =>  (r._1,r._2, r._3, r._4, r._5, r._2))
  update_inter.saveToCassandra("keySpace", "myTable",
SomeColumns("a","b","c","date", "d", "e", "f"))*

this works when i check in cqlsh , but i still get the value null when i
request the same rows by spark cassandra . 

Is this a bug in spark cassandra connector? Thanks for your help.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Update-cassandra-rows-problem-tp24844.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org