Re: sc.parallelize with defaultParallelism=1

2015-09-30 Thread Andy Dang
Can't you just load the data from HBase first, and then call sc.parallelize
on your dataset?

-Andy

---
Regards,
Andy (Nam) Dang

On Wed, Sep 30, 2015 at 12:52 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
>
> When calling sc.parallelize(data,1), is there a preference where to put
> the data? I see 2 possibilities: sending it to a worker node, or keeping it
> on the driver program.
>
>
> I would prefer to keep the data local to the driver. The use case is when
> I need just to load a bit of data from HBase, and then compute over it e.g.
> aggregate, using Spark.
>
>
> Thanks,
>
> Nicu
>


Re: sc.parallelize with defaultParallelism=1

2015-09-30 Thread Nicolae Marasoiu
That's exactly what I am doing, but my question is does parallelize send the 
data to a worker node. From a performance perspective on small sets, the ideal 
would be to load in local jvm memory of the driver. I mean even designating the 
current machine as a worker node, besides driver, would still mean a localhost 
lo/net communication. I guess Spark is a batch oriented system, and I am still 
checking if there are ways to use it like this too, load data manually but 
process it with the functional & other spark libraries but without the 
distribution or m/r part.



From: Andy Dang 
Sent: Wednesday, September 30, 2015 8:17 PM
To: Nicolae Marasoiu
Cc: user@spark.apache.org
Subject: Re: sc.parallelize with defaultParallelism=1

Can't you just load the data from HBase first, and then call sc.parallelize on 
your dataset?

-Andy

---
Regards,
Andy (Nam) Dang

On Wed, Sep 30, 2015 at 12:52 PM, Nicolae Marasoiu 
> wrote:

Hi,


When calling sc.parallelize(data,1), is there a preference where to put the 
data? I see 2 possibilities: sending it to a worker node, or keeping it on the 
driver program.


I would prefer to keep the data local to the driver. The use case is when I 
need just to load a bit of data from HBase, and then compute over it e.g. 
aggregate, using Spark.


Thanks,

Nicu



Metadata in Parquet

2015-09-30 Thread Philip Weaver
Hi, I am using org.apache.spark.sql.types.Metadata to store extra
information along with each of my fields. I'd also like to store Metadata
for the entire DataFrame, not attached to any specific field. Is this
supported?

- Philip


Re: Hive permanent functions are not available in Spark SQL

2015-09-30 Thread Pala M Muthaia
+user list

On Tue, Sep 29, 2015 at 3:43 PM, Pala M Muthaia  wrote:

> Hi,
>
> I am trying to use internal UDFs that we have added as permanent functions
> to Hive, from within Spark SQL query (using HiveContext), but i encounter
> NoSuchObjectException, i.e. the function could not be found.
>
> However, if i execute 'show functions' command in spark SQL, the permanent
> functions appear in the list.
>
> I am using Spark 1.4.1 with Hive 0.13.1. I tried to debug this by looking
> at the log and code, but it seems both the show functions command as well
> as udf query both go through essentially the same code path, but the former
> can see the UDF but the latter can't.
>
> Any ideas on how to debug/fix this?
>
>
> Thanks,
> pala
>


Re: sc.parallelize with defaultParallelism=1

2015-09-30 Thread Marcelo Vanzin
If you want to process the data locally, why do you need to use sc.parallelize?

Store the data in regular Scala collections and use their methods to
process them (they have pretty much the same set of methods as Spark
RDDs). Then when you're happy, finally use Spark to process the
pre-processed input data.

Or you can run Spark in "local" mode, in which case the executor(s)
run in the same VM as the master.

Unless I'm misunderstanding what it is you're trying to achieve here?


On Wed, Sep 30, 2015 at 10:25 AM, Nicolae Marasoiu
 wrote:
> That's exactly what I am doing, but my question is does parallelize send the
> data to a worker node. From a performance perspective on small sets, the
> ideal would be to load in local jvm memory of the driver. I mean even
> designating the current machine as a worker node, besides driver, would
> still mean a localhost lo/net communication. I guess Spark is a batch
> oriented system, and I am still checking if there are ways to use it like
> this too, load data manually but process it with the functional & other
> spark libraries but without the distribution or m/r part.
>
>
>
> 
> From: Andy Dang 
> Sent: Wednesday, September 30, 2015 8:17 PM
> To: Nicolae Marasoiu
> Cc: user@spark.apache.org
> Subject: Re: sc.parallelize with defaultParallelism=1
>
> Can't you just load the data from HBase first, and then call sc.parallelize
> on your dataset?
>
> -Andy
>
> ---
> Regards,
> Andy (Nam) Dang
>
> On Wed, Sep 30, 2015 at 12:52 PM, Nicolae Marasoiu
>  wrote:
>>
>> Hi,
>>
>>
>> When calling sc.parallelize(data,1), is there a preference where to put
>> the data? I see 2 possibilities: sending it to a worker node, or keeping it
>> on the driver program.
>>
>>
>> I would prefer to keep the data local to the driver. The use case is when
>> I need just to load a bit of data from HBase, and then compute over it e.g.
>> aggregate, using Spark.
>>
>>
>> Thanks,
>>
>> Nicu
>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [cache eviction] partition recomputation in big lineage RDDs

2015-09-30 Thread Nicolae Marasoiu
Hi,


An equivalent question would be: can the memory cache be selectively evicted 
from within a component run in the driver? I know it is breaking some 
abstraction/encapsulation, but clearly I need to evict part of the cache so 
that it is reloaded with newer values from DB.


Because what I basically need is invalidating some portions of the data which 
have newer values. The "compute" method should be the same (read with 
TableInputFormat).

Thanks
Nicu

From: Nicolae Marasoiu 
Sent: Wednesday, September 30, 2015 4:07 PM
To: user@spark.apache.org
Subject: Re: partition recomputation in big lineage RDDs


Hi,

In fact, my RDD will get a new version (a new RDD assigned to the same var) 
quite frequently, by merging bulks of 1000 events of events of last 10s.

But recomputation would be more efficient to do not by reading initial RDD 
partition(s) and reapplying deltas, but by reading from HBase the latest data, 
and just compute on top of that if anything.

Basically I guess I need to write my own RDD and implement compute method by 
sliding on hbase.

Thanks,
Nicu

From: Nicolae Marasoiu 
Sent: Wednesday, September 30, 2015 3:05 PM
To: user@spark.apache.org
Subject: partition recomputation in big lineage RDDs


Hi,


If I implement a manner to have an up-to-date version of my RDD by ingesting 
some new events, called RDD_inc (from increment), and I provide a "merge" 
function m(RDD, RDD_inc), which returns the RDD_new, it looks like I can evolve 
the state of my RDD by constructing new RDDs all the time, and doing it in a 
manner that hopes to reuse as much data from the past RDD and make the rest 
garbage collectable. An example merge function would be a join on some ids, and 
creating a merged state for each element. The type of the result of m(RDD, 
RDD_inc) is the same type as that of RDD.


My question on this is how does the recomputation work for such an RDD, which 
is not the direct result of hdfs load, but is the result of a long lineage of 
such functions/transformations:


Lets say my RDD is now after 2 merge iterations like this:

RDD_new = merge(merge(RDD, RDD_inc1), RDD_inc2)


When recomputing a part of RDD_new here are my assumptions:

- only full partitions are recomputed, nothing more granular?

- the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recomputed

- the function are applied


And this seems more simplistic, since the partitions do not fully align in the 
general case between all these RDDs. The other aspect is the potentially 
redundant load of data which is in fact not required anymore (the data ruled 
out in the merge).


A more detailed version of this question is at 
https://www.quora.com/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/


Thanks,

Nicu


Re: Metadata in Parquet

2015-09-30 Thread Cheng Lian
Unfortunately this isn't supported at the moment 
https://issues.apache.org/jira/browse/SPARK-10803


Cheng

On 9/30/15 10:54 AM, Philip Weaver wrote:
Hi, I am using org.apache.spark.sql.types.Metadata to store extra 
information along with each of my fields. I'd also like to store 
Metadata for the entire DataFrame, not attached to any specific field. 
Is this supported?


- Philip




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fetching Date value from RDD of type spark.sql.row

2015-09-30 Thread satish chandra j
HI All,
Currently using Spark 1.2.2, as getDate method is not defined in *Public
Class Row* for this Spark version hence trying to fetch Date value of a
specific coulmn using *get* method as specified in API docs as mentioned
below:

https://spark.apache.org/docs/1.2.2/api/java/index.html?org/apache/spark/sql/api/java/Row.html

But getting an error: "value get is not a member of
org.apache.spark.sql.row"

Let me know if any alternate method to fetch the Date in a Row

Regards,
Satish Chandra


Re: Hive ORC Malformed while loading into spark data frame

2015-09-30 Thread Umesh Kacha
Hi Zang thanks much please find the code below

Working code loading data from a path created by Hive table using hive
console outside of spark :

DataFrame df =
hiveContext.read().format("orc").load("/hdfs/path/to/hive/table/partition")

Not working code inside spark hive tables created using hiveContext.sql
insert into partition queries

DataFrame df =
hiveContext.read().format("orc").load("/hdfs/path/to/hive/table/partition/created/by/spark")

You see above is same in both cases just second code is trying to load orc
data created by Spark.
On Sep 30, 2015 11:22 AM, "Zhan Zhang"  wrote:

> Hi Umesh,
>
> The potential reason is that Hive and Spark does not use same
> OrcInputFormat. In new hive version, there are NewOrcInputFormat, but it is
> not in spark because of backward compatibility (which is not available in
> hive-0.12).
> Do you mind post the code that works and not works for you?
>
> Thanks.
>
> Zhan Zhang
>
> On Sep 29, 2015, at 10:05 PM, Umesh Kacha  wrote:
>
> Hi I can read/load orc data created by hive table in a dataframe why is it
> throwing Malformed ORC exception when I try to load data created by
> hiveContext.sql into dataframe?
> On Sep 30, 2015 2:37 AM, "Hortonworks"  wrote:
>
>> You can try to use data frame for both read and write
>>
>> Thanks
>>
>> Zhan Zhang
>>
>>
>> Sent from my iPhone
>>
>> On Sep 29, 2015, at 1:56 PM, Umesh Kacha  wrote:
>>
>> Hi Zang, thanks for the response. Table is created using Spark
>> hiveContext.sql and data inserted into table also using hiveContext.sql.
>> Insert into partition table. When I try to load orc data into dataframe I
>> am loading particular partition data stored in path say
>> /user/xyz/Hive/xyz.db/sparktable/partition1=abc
>>
>> Regards,
>> Umesh
>> On Sep 30, 2015 02:21, "Hortonworks"  wrote:
>>
>>> How was the table is generated, by hive or by spark?
>>>
>>> If you generate table using have but read it by data frame, it may have
>>> some comparability issue.
>>>
>>> Thanks
>>>
>>> Zhan Zhang
>>>
>>>
>>> Sent from my iPhone
>>>
>>> > On Sep 29, 2015, at 1:47 PM, unk1102  wrote:
>>> >
>>> > Hi I have a spark job which creates hive tables in orc format with
>>> > partitions. It works well I can read data back into hive table using
>>> hive
>>> > console. But if I try further process orc files generated by Spark job
>>> by
>>> > loading into dataframe  then I get the following exception
>>> > Caused by: java.io.IOException: Malformed ORC file
>>> > hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt.
>>> Invalid
>>> > postscript.
>>> >
>>> > Dataframe df = hiveContext.read().format("orc").load(to/path);
>>> >
>>> > Please guide.
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com .
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>> >
>>>
>>> --
>>> CONFIDENTIALITY NOTICE
>>> NOTICE: This message is intended for the use of the individual or entity
>>> to
>>> which it is addressed and may contain information that is confidential,
>>> privileged and exempt from disclosure under applicable law. If the reader
>>> of this message is not the intended recipient, you are hereby notified
>>> that
>>> any printing, copying, dissemination, distribution, disclosure or
>>> forwarding of this communication is strictly prohibited. If you have
>>> received this communication in error, please contact the sender
>>> immediately
>>> and delete it from your system. Thank You.
>>>
>>
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or entity
>> to which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the reader
>> of this message is not the intended recipient, you are hereby notified that
>> any printing, copying, dissemination, distribution, disclosure or
>> forwarding of this communication is strictly prohibited. If you have
>> received this communication in error, please contact the sender immediately
>> and delete it from your system. Thank You.
>
>
>


Re: UnknownHostException with Mesos and custom Jar

2015-09-30 Thread Akhil Das
Can you try replacing your code with the hdfs uri? like:

sc.textFile("hdfs://...").collect().foreach(println)

Thanks
Best Regards

On Tue, Sep 29, 2015 at 1:45 AM, Stephen Hankinson 
wrote:

> Hi,
>
> Wondering if anyone can help me with the issue I am having.
>
> I am receiving an UnknownHostException when running a custom jar with
> Spark on Mesos. The issue does not happen when running spark-shell.
>
> My spark-env.sh contains the following:
>
> export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos.so
>
> export HADOOP_CONF_DIR=/hadoop-2.7.1/etc/hadoop/
>
> My spark-defaults.conf contains the following:
>
> spark.master   mesos://zk://172.31.0.81:2181,
> 172.31.16.81:2181,172.31.32.81:2181/mesos
>
> spark.mesos.executor.home  /spark-1.5.0-bin-hadoop2.6/
>
> Starting spark-shell as follows and running the following line works
> correctly:
>
> /spark-1.5.0-bin-hadoop2.6/bin/spark-shell
>
> sc.textFile("/tmp/Input").collect.foreach(println)
>
> 15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(88528) called
> with curMem=0, maxMem=556038881
>
> 15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_0 stored as
> values in memory (estimated size 86.5 KB, free 530.2 MB)
>
> 15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(20236) called
> with curMem=88528, maxMem=556038881
>
> 15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_0_piece0
> stored as bytes in memory (estimated size 19.8 KB, free 530.2 MB)
>
> 15/09/28 20:04:49 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
> in memory on 172.31.21.104:49048 (size: 19.8 KB, free: 530.3 MB)
>
> 15/09/28 20:04:49 INFO spark.SparkContext: Created broadcast 0 from
> textFile at :22
>
> 15/09/28 20:04:49 INFO mapred.FileInputFormat: Total input paths to
> process : 1
>
> 15/09/28 20:04:49 INFO spark.SparkContext: Starting job: collect at
> :22
>
> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Got job 0 (collect at
> :22) with 3 output partitions
>
> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Final stage: ResultStage
> 0(collect at :22)
>
> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Parents of final stage:
> List()
>
> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Missing parents: List()
>
> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Submitting ResultStage 0
> (MapPartitionsRDD[1] at textFile at :22), which has no missing
> parents
>
> 15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(3120) called
> with curMem=108764, maxMem=556038881
>
> 15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_1 stored as
> values in memory (estimated size 3.0 KB, free 530.2 MB)
>
> 15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(1784) called
> with curMem=111884, maxMem=556038881
>
> 15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_1_piece0
> stored as bytes in memory (estimated size 1784.0 B, free 530.2 MB)
>
> 15/09/28 20:04:49 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
> in memory on 172.31.21.104:49048 (size: 1784.0 B, free: 530.3 MB)
>
> 15/09/28 20:04:49 INFO spark.SparkContext: Created broadcast 1 from
> broadcast at DAGScheduler.scala:861
>
> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Submitting 3 missing tasks
> from ResultStage 0 (MapPartitionsRDD[1] at textFile at :22)
>
> 15/09/28 20:04:49 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
> with 3 tasks
>
> 15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 0.0 in
> stage 0.0 (TID 0, ip-172-31-37-82.us-west-2.compute.internal, NODE_LOCAL,
> 2142 bytes)
>
> 15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 1.0 in
> stage 0.0 (TID 1, ip-172-31-21-104.us-west-2.compute.internal, NODE_LOCAL,
> 2142 bytes)
>
> 15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 2.0 in
> stage 0.0 (TID 2, ip-172-31-4-4.us-west-2.compute.internal, NODE_LOCAL,
> 2142 bytes)
>
> 15/09/28 20:04:52 INFO storage.BlockManagerMasterEndpoint: Registering
> block manager ip-172-31-4-4.us-west-2.compute.internal:50648 with 530.3 MB
> RAM, BlockManagerId(20150928-190245-1358962604-5050-11297-S2,
> ip-172-31-4-4.us-west-2.compute.internal, 50648)
>
> 15/09/28 20:04:52 INFO storage.BlockManagerMasterEndpoint: Registering
> block manager ip-172-31-37-82.us-west-2.compute.internal:52624 with 530.3
> MB RAM, BlockManagerId(20150928-190245-1358962604-5050-11297-S1,
> ip-172-31-37-82.us-west-2.compute.internal, 52624)
>
> 15/09/28 20:04:52 INFO storage.BlockManagerMasterEndpoint: Registering
> block manager ip-172-31-21-104.us-west-2.compute.internal:56628 with 530.3
> MB RAM, BlockManagerId(20150928-190245-1358962604-5050-11297-S0,
> ip-172-31-21-104.us-west-2.compute.internal, 56628)
>
> 15/09/28 20:04:52 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
> in memory on ip-172-31-37-82.us-west-2.compute.internal:52624 (size: 1784.0
> B, free: 530.3 MB)
>
> 15/09/28 20:04:52 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
> in memory on 

Re: Reading kafka stream and writing to hdfs

2015-09-30 Thread Akhil Das
Like:

counts.saveAsTestFiles("hdfs://host:port/some/location")


Thanks
Best Regards

On Tue, Sep 29, 2015 at 2:15 AM, Chengi Liu  wrote:

> Hi,
>   I am going thru this example here:
>
> https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/kafka_wordcount.py
> If I want to write this data on hdfs.
> Whats the right way to do this?
> Thanks
>


Re: Submitting with --deploy-mode cluster: uploading the jar

2015-09-30 Thread Saisai Shao
As I remembered you don't need to upload application jar manually, Spark
will do it for you when you use Spark submit. Would you mind posting out
your command of Spark submit?


On Wed, Sep 30, 2015 at 3:13 PM, Christophe Schmitz 
wrote:

> Hi there,
>
> I am trying to use the "--deploy-mode cluster" option to submit my job
> (spark 1.4.1). When I do that, the spark-driver (on the cluster) is looking
> for my application jar. I can manually copy my application jar on all the
> workers, but I was wondering if there is a way to submit the application
> jar when running spark-submit.
>
> Thanks!
>


Re: Setting Spark TMP Directory in Cluster Mode

2015-09-30 Thread mufy
Any takers? :-)


---
Mufeed Usman
My LinkedIn  | My
Social Cause  | My Blogs : LiveJournal





On Mon, Sep 28, 2015 at 10:19 AM, mufy  wrote:

> Hello Akhil,
>
> I do not see how that would work for a YARN cluster mode execution
> since the local directories used by the Spark executors and the Spark
> driver are the local directories that are configured for YARN
> (yarn.nodemanager.local-dirs). If you specify a different path with
> SPARK_LOCAL_DIRS, that path will be ignored.
>
>
> ---
> Mufeed Usman
> My LinkedIn  | My
> Social Cause  | My Blogs : LiveJournal
> 
>
>


Re: Spark thrift service and Hive impersonation.

2015-09-30 Thread Steve Loughran

On 30 Sep 2015, at 03:24, Mohammed Guller 
> wrote:

Does each user needs to start own thrift server to use it?

No. One of the benefits of the Spark Thrift Server is that it allows multiple 
users to share a single SparkContext.

Most likely, you have file permissions issue.



I don't think the spark hive thrift server does the multi-user stuff (yet)

Mohammed

From: Jagat Singh [mailto:jagatsi...@gmail.com]
Sent: Tuesday, September 29, 2015 5:30 PM
To: SparkUser
Subject: Spark thrift service and Hive impersonation.

Hi,

I have started the Spark thrift service using spark user.

Does each user needs to start own thrift server to use it?

Using beeline i am able to connect to server and execute show tables;

However when we try to execute some real query it runs as spark user and HDFS 
permissions does not allow them to be read.

The query fails with error

0: jdbc:hive2://localhost:1> select count(*) from mytable;
Error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
mytable. java.security.AccessControlException: Permission denied: user=spark, 
access=READ, inode="/data/mytable":tdcprdr:tdcprdr:drwxr-x--x
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)


And in thrift server we get log.


In the hive-site.xml we have impersonation enabled.

   
  hive.server2.enable.doAs
  true



  hive.server2.enable.impersonation
  true


Is there any other configuration to be done for it to work like normal hive 
thrift server.

Thanks



Re: log4j Spark-worker performance problem

2015-09-30 Thread Akhil Das
Depends how big the lines are, on a typical HDD you can write at max
10-15MB/s, and on SSDs it can be upto 30-40MB/s.

Thanks
Best Regards

On Mon, Sep 28, 2015 at 3:57 PM, vaibhavrtk  wrote:

> Hello
>
> We need a lot of logging for our application about 1000 lines needed to be
> logged per message we process and we process 1000 msgs/sec. So total lines
> needed to be logged is /1000*1000/sec/. As it is going to be written in a
> file. Will writing so much logs will impact the processing power of spark
> by
> a lot?
> If yes, What can be the alternative?
>
> Note: This much logging is required for the appropriate monitoring of the
> application.
> Let me know if more information is needed.
>
> Thanks
> Vaibhav
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/log4j-Spark-worker-performance-problem-tp24842.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: "Method json([class java.util.HashMap]) does not exist" when reading JSON on PySpark

2015-09-30 Thread Akhil Das
Each Json Doc should be in a single line i guess.
http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

Note that the file that is offered as *a json file* is not a typical JSON
file. Each line must contain a separate, self-contained valid JSON object.
As a consequence, a regular multi-line JSON file will most often fail.

Thanks
Best Regards

On Tue, Sep 29, 2015 at 11:07 AM, Fernando Paladini 
wrote:

> Hello guys,
>
> I'm very new to Spark and I'm having some troubles when reading a JSON to
> dataframe on PySpark.
>
> I'm getting a JSON object from an API response and I would like to store
> it in Spark as a DataFrame (I've read that DataFrame is better than RDD,
> that's accurate?). For what I've read
> 
> on documentation, I just need to call the method sqlContext.read.json in
> order to do what I want.
>
> *Following is the code from my test application:*
> json_object = json.loads(response.text)
> sc = SparkContext("local", appName="JSON to RDD")
> sqlContext = SQLContext(sc)
> dataframe = sqlContext.read.json(json_object)
> dataframe.show()
>
> *The problem is that when I run **"spark-submit myExample.py" I got the
> following error:*
> 15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
> manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
> localhost, 48634)
> 15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
> Traceback (most recent call last):
>   File "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py",
> line 35, in 
> dataframe = sqlContext.read.json(json_object)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line
> 144, in json
>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36,
> in deco
>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 304, in get_return_value
> py4j.protocol.Py4JError: An error occurred while calling o21.json. Trace:
> py4j.Py4JException: Method json([class java.util.HashMap]) does not exist
> at
> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
> at
> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
> at py4j.Gateway.invoke(Gateway.java:252)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
>
> *What I'm doing wrong? *
> Check out this gist
>  to see the JSON
> I'm trying to load.
>
> Thanks!
> Fernando Paladini
>


Query about checkpointing time

2015-09-30 Thread jatinganhotra
Hi,

I started doing the  amp-camp 5 exercises
 
. I tried the following 2 scenarios:

*Scenario #1*
val pagecounts = sc.textFile("data/pagecounts")
pagecounts.checkpoint
pagecounts.count

*Scenario #2*
val pagecounts = sc.textFile("data/pagecounts")
pagecounts.checkpoint

The total time show in the Spark shell Application UI was different for both
scenarios. /Scenario #1 took 0.5 seconds, while scenario #2 took only 0.2
s/.

*Questions:*
1. In scenario #1, checkpoint command does nothing, it's neither a
transformation nor an action. It's saying that once the RDD materializes
after an action, go ahead and save to disk. Am I missing something here?

2. I understand that scenario #1 is taking more time, because the RDD is
check-pointed (written to disk). Is there a way I can know the amount taken
to checkpoint, from the total time?
The Spark shell Application UI shows the following - Scheduler delay, Task
Deserialization time, GC time, Result serialization time, getting result
time. But, doesn't show the breakdown for checkpointing.

3. Is there a way to access the above metrics e.g. scheduler delay, GC time
and save them programmatically? I want to log some of the above metrics for
every action invoked on an RDD.

Please let me know if you need more information.
Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Query-about-checkpointing-time-tp24884.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Partition Column in JDBCRDD or Datasource API

2015-09-30 Thread satish chandra j
HI All,
Please provide your inputs on Partition Column to be used in DataSourceAPI
or JDBCRDD in a scenerio where the source table does not have a Numeric
Columns which is sequential and unique such that proper partitioning can
take place in Spark

Regards,
Satish


Submitting with --deploy-mode cluster: uploading the jar

2015-09-30 Thread Christophe Schmitz
Hi there,

I am trying to use the "--deploy-mode cluster" option to submit my job
(spark 1.4.1). When I do that, the spark-driver (on the cluster) is looking
for my application jar. I can manually copy my application jar on all the
workers, but I was wondering if there is a way to submit the application
jar when running spark-submit.

Thanks!


Need for advice - performance improvement and out of memory resolution

2015-09-30 Thread Camelia Elena Ciolac
Hello,

I am working on a machine learning project, currently using 
spark-1.4.1-bin-hadoop2.6 in local mode on a laptop (Ubuntu 14.04 OS running on 
a Dell laptop with i7-5600@2.6 GHz * 4 cores, 15.6 GB RAM). I also mention 
working in Python from an IPython notebook.


I face the following problem: when working with a Dataframe created from a CSV 
file (2.7 GB) with schema inferred (1900 features), the time it takes for Spark 
to count the 145231 rows is 3:30 minutes using 4 cores. Longer times are 
recorder for computing one feature's statistics, for example:



START AT: 2015-09-21 08:56:41.136947

+---+--+
|summary|  VAR_1933|
+---+--+
|  count|145231|
|   mean| 8849.839111484464|
| stddev|3175.7863998269395|
|min| 0|
|max|  |
+---+--+


FINISH AT: 2015-09-21 
09:02:49.452260




So, my first question would be what configuration parameters to set in order to 
improve this performance?

I tried some explicit configuration in the IPython notebook, but specifying 
resources explicitly when creating the Spark configuration resulted in worse 
performance; I mean :

config = 
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars", 
jar_path)

worked twice faster than:

config = 
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars", 
jar_path).set("spark.driver.memory", "2g").set("spark.python.worker.memory ", 
"3g")





Secondly, when I do the one hot encoding (I tried two different ways of keeping 
results) I don't arrive at showing the head(1) of the resulted dataframe. We 
have the function :

def OHE_transform(categ_feat, df_old):
outputcolname = categ_feat + "_ohe_index"
outputcolvect = categ_feat + "_ohe_vector"
stringIndexer = StringIndexer(inputCol=categ_feat, outputCol=outputcolname)
indexed = stringIndexer.fit(df_old).transform(df_old)
encoder = OneHotEncoder(inputCol=outputcolname, outputCol=outputcolvect)
encoded = encoder.transform(indexed)
return encoded


The two manners for keeping results are depicted below:

1)

result = OHE_transform(list_top_feat[0], df_top_categ)
for item in list_top_feat[1:]:
result = OHE_transform(item, result)
result.head(1)


2)

df_result = OHE_transform("VAR_A", df_top_categ)
df_result_1 = OHE_transform("VAR_B", df_result)
df_result_2 = OHE_transform("VAR_C", df_result_1)
...
df_result_12 = OHE_transform("VAR_X", df_result_11)
df_result_12.head(1)

In the first approach, at the third iteration (in the for loop), when it was 
supposed to print the head(1), the IPython notebook  remained in the state 
"Kernel busy" for several hours and then I interrupted the kernel.
The second approach managed to go through all transformations (please note that 
here I eliminated the intermediary prints of the head(1)), but it gave an "out 
of memory" error at the only (final result) head(1),  that I paste below :

===


df_result_12.head(1)

---
Py4JJavaError Traceback (most recent call last)
 in ()
> 1 df_result_12.head(1)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
head(self, n)
649 rs = self.head(1)
650 return rs[0] if rs else None
--> 651 return self.take(n)
652
653 @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
take(self, num)
305 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
306 """
--> 307 return self.limit(num).collect()
308
309 @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
collect(self)
279 """
280 with SCCallSiteSync(self._sc) as css:
--> 281 port = 
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
282 rs = list(_load_from_socket(port, 
BatchedSerializer(PickleSerializer(
283 cls = _create_cls(self.schema)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', 

Lost leader exception in Kafka Direct for Streaming

2015-09-30 Thread swetha

Hi,

I see this sometimes in our Kafka Direct approach in our Streaming job. How
do we make sure that the job recovers from such errors and works normally
thereafter?

15/09/30 05:14:18 ERROR KafkaRDD: Lost leader for topic x_stream partition
19,  sleeping for 200ms
15/09/30 05:14:18 ERROR KafkaRDD: Lost leader for topic x_stream partition
5,  sleeping for 200ms

Followed by every task failing with something like this:

15/09/30 05:26:20 ERROR Executor: Exception in task 4.0 in stage 84281.0
(TID 818804)
kafka.common.NotLeaderForPartitionException

And:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 15
in stage 84958.0 failed 4 times, most recent failure: Lost task 15.3 in
stage 84958.0 (TID 819461, 10.227.68.102): java.lang.AssertionError:
assertion failed: Beginning offset 15027734702 is after the ending offset
15027725493 for topic hubble_stream partition 12. You either provided an
invalid fromOffset, or the Kafka topic has been damaged


Thanks,
Swetha




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Lost-leader-exception-in-Kafka-Direct-for-Streaming-tp24891.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What is the best way to submit multiple tasks?

2015-09-30 Thread Saif.A.Ellafi
Hi all,

I have a process where I do some calculations on each one of the columns of a 
dataframe.
Intrinsecally, I run across each column with a for loop. On the other hand, 
each process itself is non-entirely-distributable.

To speed up the process, I would like to submit a spark program for each 
column, any suggestions? I was thinking on primitive threads sharing a spark 
context.

Thank you,
Saif



[streaming] reading Kafka direct stream throws kafka.common.OffsetOutOfRangeException

2015-09-30 Thread Alexey Ponkin
Hi

I have simple spark-streaming job(8 executors 1 core - on 8 node cluster) - 
read from Kafka topic( 3 brokers with 8 partitions) and save to Cassandra.
The problem is that when I increase number of incoming messages in topic the 
job is starting to fail with kafka.common.OffsetOutOfRangeException.
Job fails starting from 100 events per second.

Thanks in advance

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Combine key-value pair in spark java

2015-09-30 Thread Ramkumar V
Hi,

I have key value pair of JavaRDD (JavaPairRDD rdd) but i
want to concatenate into one RDD String (JavaRDD result ).

How can i do that ? What i have to use (map,flatmap)? can anyone please
give me the syntax for this in java ?

*Thanks*,



Need for advice - performance improvement and out of memory resolution

2015-09-30 Thread Camelia Elena Ciolac
Hello,

I am working on a machine learning project, currently using 
spark-1.4.1-bin-hadoop2.6 in local mode on a laptop (Ubuntu 14.04 OS running on 
a Dell laptop with i7-5600@2.6 GHz * 4 cores, 15.6 GB RAM). I also mention 
working in Python from an IPython notebook.


I face the following problem: when working with a Dataframe created from a CSV 
file (2.7 GB) with schema inferred (1900 features), the time it takes for Spark 
to count the 145231 rows is 3:30 minutes using 4 cores. Longer times are 
recorder for computing one feature's statistics, for example:



START AT: 2015-09-21 08:56:41.136947

+---+--+
|summary|  VAR_1933|
+---+--+
|  count|145231|
|   mean| 8849.839111484464|
| stddev|3175.7863998269395|
|min| 0|
|max|  |
+---+--+


FINISH AT: 2015-09-21 
09:02:49.452260





So, my first question would be what configuration parameters to set in order to 
improve this performance?

I tried some explicit configuration in the IPython notebook, but specifying 
resources explicitly when creating the Spark configuration resulted in worse 
performance; I mean :

config = 
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars", 
jar_path)

worked twice faster than:

config = 
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars", 
jar_path).set("spark.driver.memory", "2g").set("spark.python.worker.memory ", 
"3g")





Secondly, when I do the one hot encoding (I tried two different ways of keeping 
results) I don't arrive at showing the head(1) of the resulted dataframe. We 
have the function :

def OHE_transform(categ_feat, df_old):
outputcolname = categ_feat + "_ohe_index"
outputcolvect = categ_feat + "_ohe_vector"
stringIndexer = StringIndexer(inputCol=categ_feat, outputCol=outputcolname)
indexed = stringIndexer.fit(df_old).transform(df_old)
encoder = OneHotEncoder(inputCol=outputcolname, outputCol=outputcolvect)
encoded = encoder.transform(indexed)
return encoded


The two manners for keeping results are depicted below:

1)

result = OHE_transform(list_top_feat[0], df_top_categ)
for item in list_top_feat[1:]:
result = OHE_transform(item, result)
result.head(1)


2)

df_result = OHE_transform("VAR_A", df_top_categ)
df_result_1 = OHE_transform("VAR_B", df_result)
df_result_2 = OHE_transform("VAR_C", df_result_1)
...
df_result_12 = OHE_transform("VAR_X", df_result_11)
df_result_12.head(1)

In the first approach, at the third iteration (in the for loop), when it was 
supposed to print the head(1), the IPython notebook  remained in the state 
"Kernel busy" for several hours and then I interrupted the kernel.
The second approach managed to go through all transformations (please note that 
here I eliminated the intermediary prints of the head(1)), but it gave an "out 
of memory" error at the only (final result) head(1),  that I paste below :

===




df_result_12.head(1)

---
Py4JJavaError Traceback (most recent call last)
 in ()
> 1 df_result_12.head(1)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
head(self, n)
649 rs = self.head(1)
650 return rs[0] if rs else None
--> 651 return self.take(n)
652
653 @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
take(self, num)
305 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
306 """
--> 307 return self.limit(num).collect()
308
309 @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
collect(self)
279 """
280 with SCCallSiteSync(self._sc) as css:
--> 281 port = 
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
282 rs = list(_load_from_socket(port, 
BatchedSerializer(PickleSerializer(
283 cls = _create_cls(self.schema)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', 

Need for advice - performance improvement and out of memory resolution

2015-09-30 Thread camelia
Hello,

I am working on a machine learning project, currently using
spark-1.4.1-bin-hadoop2.6 in local mode on a laptop (Ubuntu 14.04 OS running
on a Dell laptop with i7-5600@2.6 GHz * 4 cores, 15.6 GB RAM). I also
mention working in Python from an IPython notebook.
 

I face the following problem: when working with a Dataframe created from a
CSV file (2.7 GB) with schema inferred (1900 features), the time it takes
for Spark to count the 145231 rows is 3:30 minutes using 4 cores. Longer
times are recorder for computing one feature's statistics, for example:


START AT: 2015-09-21
08:56:41.136947
 
+---+--+
|summary|  VAR_1933|
+---+--+
|  count|145231|
|   mean| 8849.839111484464|
| stddev|3175.7863998269395|
|min| 0|
|max|  |
+---+--+

 
FINISH AT: 2015-09-21
09:02:49.452260



So, my first question would be what configuration parameters to set in order
to improve this performance?

I tried some explicit configuration in the IPython notebook, but specifying
resources explicitly when creating the Spark configuration resulted in worse
performance; I mean :

config =
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars",
jar_path)

worked twice faster than:

config =
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars",
jar_path).set("spark.driver.memory", "2g").set("spark.python.worker.memory
", "3g")





Secondly, when I do the one hot encoding (I tried two different ways of
keeping results) I don't arrive at showing the head(1) of the resulted
dataframe. We have the function :

def OHE_transform(categ_feat, df_old):
outputcolname = categ_feat + "_ohe_index"
outputcolvect = categ_feat + "_ohe_vector"
stringIndexer = StringIndexer(inputCol=categ_feat,
outputCol=outputcolname)
indexed = stringIndexer.fit(df_old).transform(df_old)
encoder = OneHotEncoder(inputCol=outputcolname, outputCol=outputcolvect)
encoded = encoder.transform(indexed)
return encoded


The two manners for keeping results are depicted below:

1)

result = OHE_transform(list_top_feat[0], df_top_categ)
for item in list_top_feat[1:]:
result = OHE_transform(item, result)
result.head(1)


2)

df_result = OHE_transform("VAR_A", df_top_categ)
df_result_1 = OHE_transform("VAR_B", df_result)
df_result_2 = OHE_transform("VAR_C", df_result_1)
...
df_result_12 = OHE_transform("VAR_X", df_result_11)
df_result_12.head(1)

In the first approach, at the third iteration (in the for loop), when it was
supposed to print the head(1), the IPython notebook  remained in the state
"Kernel busy" for several hours and then I interrupted the kernel.
The second approach managed to go through all transformations (please note
that here I eliminated the intermediary prints of the head(1)), but it gave
an "out of memory" error at the only (final result) head(1),  that I paste
below :

===

 

df_result_12.head(1)

---
Py4JJavaError Traceback (most recent call last)
 in ()
> 1 df_result_12.head(1)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
head(self, n)
649 rs = self.head(1)
650 return rs[0] if rs else None
--> 651 return self.take(n)
652 
653 @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
take(self, num)
305 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
306 """
--> 307 return self.limit(num).collect()
308 
309 @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
collect(self)
279 """
280 with SCCallSiteSync(self._sc) as css:
--> 281 port =
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
282 rs = list(_load_from_socket(port,
BatchedSerializer(PickleSerializer(
283 cls = _create_cls(self.schema)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539 
540 for temp_arg in temp_args:

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301

Re: Combine key-value pair in spark java

2015-09-30 Thread Andy Dang
You should be able to use a simple mapping:

rdd.map(tuple -> tuple._1() + tuple._2())

---
Regards,
Andy (Nam) Dang

On Wed, Sep 30, 2015 at 10:34 AM, Ramkumar V 
wrote:

> Hi,
>
> I have key value pair of JavaRDD (JavaPairRDD rdd) but i
> want to concatenate into one RDD String (JavaRDD result ).
>
> How can i do that ? What i have to use (map,flatmap)? can anyone please
> give me the syntax for this in java ?
>
> *Thanks*,
> 
>
>


Spark Streaming Standalone 1.5 - Stage cancelled because SparkContext was shut down

2015-09-30 Thread tranan
Hello All,

I have several Spark Streaming applications running on Standalone mode in
Spark 1.5.  Spark is currently set up for dynamic resource allocation.  The
issue I am seeing is that I can have about 12 Spark Streaming Jobs running
concurrently.  Occasionally I would see more than half where to fail due to
Stage cancelled because SparkContext was shut down.  It would automatically
restart as it runs on supervised mode.  Attached is the screenshot of one of
the jobs that failed.  Anyone have any insight as to what is going on?

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Standalone-1-5-Stage-cancelled-because-SparkContext-was-shut-down-tp24885.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark thrift service and Hive impersonation.

2015-09-30 Thread Vinay Shukla
Steve is right,
 The Spark thing server does not profs page end user identity downstream
yet.



On Wednesday, September 30, 2015, Steve Loughran 
wrote:

>
> On 30 Sep 2015, at 03:24, Mohammed Guller  > wrote:
>
> Does each user needs to start own thrift server to use it?
>
>
>
> No. One of the benefits of the Spark Thrift Server is that it allows
> multiple users to share a single SparkContext.
>
>
>
> Most likely, you have file permissions issue.
>
>
>
>
> I don't think the spark hive thrift server does the multi-user stuff (yet)
>
> Mohammed
>
>
>
> *From:* Jagat Singh [mailto:jagatsi...@gmail.com
> ]
> *Sent:* Tuesday, September 29, 2015 5:30 PM
> *To:* SparkUser
> *Subject:* Spark thrift service and Hive impersonation.
>
>
>
> Hi,
>
>
>
> I have started the Spark thrift service using spark user.
>
>
>
> Does each user needs to start own thrift server to use it?
>
>
>
> Using beeline i am able to connect to server and execute show tables;
>
>
>
> However when we try to execute some real query it runs as spark user and
> HDFS permissions does not allow them to be read.
>
>
>
> The query fails with error
>
>
>
> 0: jdbc:hive2://localhost:1> select count(*) from mytable;
>
> Error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch
> table mytable. java.security.AccessControlException: Permission denied:
> user=spark, access=READ, inode="/data/mytable":tdcprdr:tdcprdr:drwxr-x--x
>
> at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)
>
>
>
>
>
> And in thrift server we get log.
>
>
>
>
>
> In the hive-site.xml we have impersonation enabled.
>
>
>
>
>
>   hive.server2.enable.doAs
>
>   true
>
> 
>
>
>
> 
>
>   hive.server2.enable.impersonation
>
>   true
>
> 
>
>
>
> Is there any other configuration to be done for it to work like normal
> hive thrift server.
>
>
>
> Thanks
>
>
>


Spark Streaming

2015-09-30 Thread Amith sha
Hi All,
I am planning to handle streaming data from kafka to spark Using python code
Earlier using my own log files i handled them in spark using INDEX But
in case of Apache log
I cannot prefer index because by splitting with whitespace, index will
be missed so
Is that Possible to use regex in TrasformRDD ?
OR
Any other possible ways to for different groups
ex:-
THIS IS THE APACHE LOG

[u'10.10.80.1', u'-', u'-', u'[08/Sep/2015:12:15:15', u'+0530]',
u'"GET', u'/', u'HTTP/1.1"', u'200', u'1213', u'"-"', u'"Mozilla/5.0',
u'(Windows', u'NT', u'10.0;', u'WOW64)', u'AppleWebKit/537.36',
u'(KHTML,', u'like', u'Gecko)', u'Chrome/45.0.2454.85',
u'Safari/537.36"']

I NEED LIKE THIS
IP:10.10.80.1
IDENTITY:  -
USER:  -
TIME:08/Sep/2015:12:15:15 +0530
SERVER MESSAGE:   GET /favicon.ico HTTP/1.1
STATUS:404
SIZE:514
REFERER:http://.com/
CLIENT MESSAGE:
Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like
Gecko) Chrome/45.0.2454.85 Safari/537.36





Thanks & Regards
Amithsha

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



sc.parallelize with defaultParallelism=1

2015-09-30 Thread Nicolae Marasoiu
Hi,


When calling sc.parallelize(data,1), is there a preference where to put the 
data? I see 2 possibilities: sending it to a worker node, or keeping it on the 
driver program.


I would prefer to keep the data local to the driver. The use case is when I 
need just to load a bit of data from HBase, and then compute over it e.g. 
aggregate, using Spark.


Thanks,

Nicu


Re: partition recomputation in big lineage RDDs

2015-09-30 Thread Nicolae Marasoiu
Hi,

In fact, my RDD will get a new version (a new RDD assigned to the same var) 
quite frequently, by merging bulks of 1000 events of events of last 10s.

But recomputation would be more efficient to do not by reading initial RDD 
partition(s) and reapplying deltas, but by reading from HBase the latest data, 
and just compute on top of that if anything.

Basically I guess I need to write my own RDD and implement compute method by 
sliding on hbase.

Thanks,
Nicu

From: Nicolae Marasoiu 
Sent: Wednesday, September 30, 2015 3:05 PM
To: user@spark.apache.org
Subject: partition recomputation in big lineage RDDs


Hi,


If I implement a manner to have an up-to-date version of my RDD by ingesting 
some new events, called RDD_inc (from increment), and I provide a "merge" 
function m(RDD, RDD_inc), which returns the RDD_new, it looks like I can evolve 
the state of my RDD by constructing new RDDs all the time, and doing it in a 
manner that hopes to reuse as much data from the past RDD and make the rest 
garbage collectable. An example merge function would be a join on some ids, and 
creating a merged state for each element. The type of the result of m(RDD, 
RDD_inc) is the same type as that of RDD.


My question on this is how does the recomputation work for such an RDD, which 
is not the direct result of hdfs load, but is the result of a long lineage of 
such functions/transformations:


Lets say my RDD is now after 2 merge iterations like this:

RDD_new = merge(merge(RDD, RDD_inc1), RDD_inc2)


When recomputing a part of RDD_new here are my assumptions:

- only full partitions are recomputed, nothing more granular?

- the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recomputed

- the function are applied


And this seems more simplistic, since the partitions do not fully align in the 
general case between all these RDDs. The other aspect is the potentially 
redundant load of data which is in fact not required anymore (the data ruled 
out in the merge).


A more detailed version of this question is at 
https://www.quora.com/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/


Thanks,

Nicu


Re: Combine key-value pair in spark java

2015-09-30 Thread Ramkumar V
Thanks man. Its works for me.

*Thanks*,



On Wed, Sep 30, 2015 at 4:31 PM, Andy Dang  wrote:

> You should be able to use a simple mapping:
>
> rdd.map(tuple -> tuple._1() + tuple._2())
>
> ---
> Regards,
> Andy (Nam) Dang
>
> On Wed, Sep 30, 2015 at 10:34 AM, Ramkumar V 
> wrote:
>
>> Hi,
>>
>> I have key value pair of JavaRDD (JavaPairRDD rdd) but i
>> want to concatenate into one RDD String (JavaRDD result ).
>>
>> How can i do that ? What i have to use (map,flatmap)? can anyone please
>> give me the syntax for this in java ?
>>
>> *Thanks*,
>> 
>>
>>
>


Re: How to tell Spark not to use /tmp for snappy-unknown-***-libsnappyjava.so

2015-09-30 Thread Ted Yu
See the tail of this:
https://bugzilla.redhat.com/show_bug.cgi?id=1005811

FYI 

> On Sep 30, 2015, at 5:54 AM, Dmitry Goldenberg  
> wrote:
> 
> Is there a way to ensure Spark doesn't write to /tmp directory?
> 
> We've got spark.local.dir specified in the spark-defaults.conf file to point 
> at another directory.  But we're seeing many of these 
> snappy-unknown-***-libsnappyjava.so files being written to /tmp still.
> 
> Is there a config setting or something that would cause Spark to use another 
> directory of our choosing?
> 
> Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to tell Spark not to use /tmp for snappy-unknown-***-libsnappyjava.so

2015-09-30 Thread Dmitry Goldenberg
Thanks, Ted, will try it out.

On Wed, Sep 30, 2015 at 9:07 AM, Ted Yu  wrote:

> See the tail of this:
> https://bugzilla.redhat.com/show_bug.cgi?id=1005811
>
> FYI
>
> > On Sep 30, 2015, at 5:54 AM, Dmitry Goldenberg 
> wrote:
> >
> > Is there a way to ensure Spark doesn't write to /tmp directory?
> >
> > We've got spark.local.dir specified in the spark-defaults.conf file to
> point at another directory.  But we're seeing many of these
> snappy-unknown-***-libsnappyjava.so files being written to /tmp still.
> >
> > Is there a config setting or something that would cause Spark to use
> another directory of our choosing?
> >
> > Thanks.
>


How to tell Spark not to use /tmp for snappy-unknown-***-libsnappyjava.so

2015-09-30 Thread Dmitry Goldenberg
Is there a way to ensure Spark doesn't write to /tmp directory?

We've got spark.local.dir specified in the spark-defaults.conf file to
point at another directory.  But we're seeing many of
these snappy-unknown-***-libsnappyjava.so files being written to /tmp still.

Is there a config setting or something that would cause Spark to use
another directory of our choosing?

Thanks.


partition recomputation in big lineage RDDs

2015-09-30 Thread Nicolae Marasoiu
Hi,


If I implement a manner to have an up-to-date version of my RDD by ingesting 
some new events, called RDD_inc (from increment), and I provide a "merge" 
function m(RDD, RDD_inc), which returns the RDD_new, it looks like I can evolve 
the state of my RDD by constructing new RDDs all the time, and doing it in a 
manner that hopes to reuse as much data from the past RDD and make the rest 
garbage collectable. An example merge function would be a join on some ids, and 
creating a merged state for each element. The type of the result of m(RDD, 
RDD_inc) is the same type as that of RDD.


My question on this is how does the recomputation work for such an RDD, which 
is not the direct result of hdfs load, but is the result of a long lineage of 
such functions/transformations:


Lets say my RDD is now after 2 merge iterations like this:

RDD_new = merge(merge(RDD, RDD_inc1), RDD_inc2)


When recomputing a part of RDD_new here are my assumptions:

- only full partitions are recomputed, nothing more granular?

- the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recomputed

- the function are applied


And this seems more simplistic, since the partitions do not fully align in the 
general case between all these RDDs. The other aspect is the potentially 
redundant load of data which is in fact not required anymore (the data ruled 
out in the merge).


A more detailed version of this question is at 
https://www.quora.com/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/


Thanks,

Nicu


Problem understanding spark word count execution

2015-09-30 Thread Kartik Mathur
Hi All,

I tried running spark word count and I have couple of questions -

I am analyzing stage 0 , i.e
 *sc.textFile -> flatMap -> Map (Word count example)*

1) In the *Stage logs* under Application UI details for every task I am
seeing Shuffle write as 2.7 KB, *question - how can I know where all did
this task write ? like how many bytes to which executer ?*

2) In the executer's log when I look for same task it says 2000 bytes of
result is sent to driver , my question is , *if the results were directly
sent to driver what is this shuffle write ? *

Thanks,
Kartik


Re: Submitting with --deploy-mode cluster: uploading the jar

2015-09-30 Thread Christophe Schmitz
Hi Saisai

I am using this command:
spark-submit --deploy-mode cluster --properties-file file.conf --class
myclass test-assembly-1.0.jar

The application start only if I manually copy test-assembly-1.0.jar in all
the worer (or the master, I don't remember) and provide the full path of
the file.

On the other hand with --deploy-mode client I don't need to do that, but
then I need to accept incoming connection in my client to serve the jar
(which is not possible behind a firewall I don't control)

Thanks,

Christophe


On Wed, Sep 30, 2015 at 5:19 PM, Saisai Shao  wrote:

> As I remembered you don't need to upload application jar manually, Spark
> will do it for you when you use Spark submit. Would you mind posting out
> your command of Spark submit?
>
>
> On Wed, Sep 30, 2015 at 3:13 PM, Christophe Schmitz 
> wrote:
>
>> Hi there,
>>
>> I am trying to use the "--deploy-mode cluster" option to submit my job
>> (spark 1.4.1). When I do that, the spark-driver (on the cluster) is looking
>> for my application jar. I can manually copy my application jar on all the
>> workers, but I was wondering if there is a way to submit the application
>> jar when running spark-submit.
>>
>> Thanks!
>>
>
>


Kafka Direct Stream

2015-09-30 Thread Udit Mehta
Hi,

I am using spark direct stream to consume from multiple topics in Kafka. I
am able to consume fine but I am stuck at how to separate the data for each
topic since I need to process data differently depending on the topic.
I basically want to split the RDD consisting on N topics into N RDD's each
having 1 topic.

Any help would be appreciated.

Thanks in advance,
Udit


Worker node timeout exception

2015-09-30 Thread markluk
I setup a new Spark cluster. My worker node is dying with the following
exception. 

Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[120 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
... 11 more


Any ideas what's wrong? This is happening both for a spark program and spark
shell. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Worker-node-timeout-exception-tp24893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: "Method json([class java.util.HashMap]) does not exist" when reading JSON on PySpark

2015-09-30 Thread Michael Armbrust
I think the problem here is that you are passing in parsed JSON that stored
as a dictionary (which is converted to a hashmap when going into the JVM).
You should instead be passing in the path to the json file (formatted as
Akhil suggests) so that Spark can do the parsing in parallel.  The other
option would be to construct and RDD of JSON string and pass that to the
JSON method.

On Wed, Sep 30, 2015 at 2:28 AM, Akhil Das 
wrote:

> Each Json Doc should be in a single line i guess.
> http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
>
> Note that the file that is offered as *a json file* is not a typical JSON
> file. Each line must contain a separate, self-contained valid JSON object.
> As a consequence, a regular multi-line JSON file will most often fail.
>
> Thanks
> Best Regards
>
> On Tue, Sep 29, 2015 at 11:07 AM, Fernando Paladini 
> wrote:
>
>> Hello guys,
>>
>> I'm very new to Spark and I'm having some troubles when reading a JSON to
>> dataframe on PySpark.
>>
>> I'm getting a JSON object from an API response and I would like to store
>> it in Spark as a DataFrame (I've read that DataFrame is better than RDD,
>> that's accurate?). For what I've read
>> 
>> on documentation, I just need to call the method sqlContext.read.json in
>> order to do what I want.
>>
>> *Following is the code from my test application:*
>> json_object = json.loads(response.text)
>> sc = SparkContext("local", appName="JSON to RDD")
>> sqlContext = SQLContext(sc)
>> dataframe = sqlContext.read.json(json_object)
>> dataframe.show()
>>
>> *The problem is that when I run **"spark-submit myExample.py" I got the
>> following error:*
>> 15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
>> manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
>> localhost, 48634)
>> 15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
>> Traceback (most recent call last):
>>   File "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py",
>> line 35, in 
>> dataframe = sqlContext.read.json(json_object)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
>> line 144, in json
>>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36,
>> in deco
>>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 304, in get_return_value
>> py4j.protocol.Py4JError: An error occurred while calling o21.json. Trace:
>> py4j.Py4JException: Method json([class java.util.HashMap]) does not exist
>> at
>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>> at
>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
>> at py4j.Gateway.invoke(Gateway.java:252)
>> at
>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> *What I'm doing wrong? *
>> Check out this gist
>>  to see the JSON
>> I'm trying to load.
>>
>> Thanks!
>> Fernando Paladini
>>
>
>


Re: Submitting with --deploy-mode cluster: uploading the jar

2015-09-30 Thread Saisai Shao
Are you running on standalone deploy mode, what Spark version are you
running?

Can you explain a little more specifically what exception occurs, how to
provide the jar to Spark?

I tried in my local machine with command:

./bin/spark-submit --verbose --master spark://hw12100.local:7077
--deploy-mode cluster --class org.apache.spark.examples.SparkPi
examples/target/scala-2.10/spark-examples-1.5.1-hadoop2.7.1.jar

Seems Spark will upload this examples jar automatically, don't need to
handle it manually.

Thanks
Saisai



On Thu, Oct 1, 2015 at 8:36 AM, Christophe Schmitz 
wrote:

> Hi Saisai
>
> I am using this command:
> spark-submit --deploy-mode cluster --properties-file file.conf --class
> myclass test-assembly-1.0.jar
>
> The application start only if I manually copy test-assembly-1.0.jar in all
> the worer (or the master, I don't remember) and provide the full path of
> the file.
>
> On the other hand with --deploy-mode client I don't need to do that, but
> then I need to accept incoming connection in my client to serve the jar
> (which is not possible behind a firewall I don't control)
>
> Thanks,
>
> Christophe
>
>
> On Wed, Sep 30, 2015 at 5:19 PM, Saisai Shao 
> wrote:
>
>> As I remembered you don't need to upload application jar manually, Spark
>> will do it for you when you use Spark submit. Would you mind posting out
>> your command of Spark submit?
>>
>>
>> On Wed, Sep 30, 2015 at 3:13 PM, Christophe Schmitz 
>> wrote:
>>
>>> Hi there,
>>>
>>> I am trying to use the "--deploy-mode cluster" option to submit my job
>>> (spark 1.4.1). When I do that, the spark-driver (on the cluster) is looking
>>> for my application jar. I can manually copy my application jar on all the
>>> workers, but I was wondering if there is a way to submit the application
>>> jar when running spark-submit.
>>>
>>> Thanks!
>>>
>>
>>
>


Re: Problem understanding spark word count execution

2015-09-30 Thread Nicolae Marasoiu

Hi,

2- the end results are sent back to the driver; the shuffles are transmission 
of intermediate results between nodes such as the -> which are all intermediate 
transformations.

More precisely, since flatMap and map are narrow dependencies, meaning they can 
usually happen on the local node, I bet shuffle is just sending out the 
textFile to a few nodes to distribute the partitions.



From: Kartik Mathur 
Sent: Thursday, October 1, 2015 12:42 AM
To: user
Subject: Problem understanding spark word count execution

Hi All,

I tried running spark word count and I have couple of questions -

I am analyzing stage 0 , i.e
 sc.textFile -> flatMap -> Map (Word count example)

1) In the Stage logs under Application UI details for every task I am seeing 
Shuffle write as 2.7 KB, question - how can I know where all did this task 
write ? like how many bytes to which executer ?

2) In the executer's log when I look for same task it says 2000 bytes of result 
is sent to driver , my question is , if the results were directly sent to 
driver what is this shuffle write ?

Thanks,
Kartik



Re: [streaming] reading Kafka direct stream throws kafka.common.OffsetOutOfRangeException

2015-09-30 Thread Cody Koeninger
Offset out of range means the message in question is no longer available on
Kafka.  What's your kafka log retention set to, and how does that compare
to your processing time?

On Wed, Sep 30, 2015 at 4:26 AM, Alexey Ponkin  wrote:

> Hi
>
> I have simple spark-streaming job(8 executors 1 core - on 8 node cluster)
> - read from Kafka topic( 3 brokers with 8 partitions) and save to Cassandra.
> The problem is that when I increase number of incoming messages in topic
> the job is starting to fail with kafka.common.OffsetOutOfRangeException.
> Job fails starting from 100 events per second.
>
> Thanks in advance
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: RandomForestClassifer does not recognize number of classes, nor can number of classes be set

2015-09-30 Thread Yanbo Liang
Hi Kristina,

Currently StringIndexer is a requirement step before training DecisionTree,
RandomForest and GBT related models.
Though it does not necessary by other models such as LogisticRegression and
NaiveBayes, it also strongly recommend to make this preprocessing step
otherwise it may lead incorrect model.
SPARK-7126  focus on indexing
labels automatically, so it will not necessary to run StringIndexer explicitly
after this JIRA is resolved.

BR
Yanbo

2015-09-29 22:14 GMT+08:00 Kristina Rogale Plazonic :

> Hi,
>
> I'm trying out the ml.classification.RandomForestClassifer() on a simple
> dataframe and it returns an exception that number of classes has not been
> set in my dataframe. However, I cannot find a function that would set
> number of classes, or pass it as an argument anywhere. In mllib, numClasses
> is a parameter passed when training the model. In ml, there is an ugly hack
> using StringIndexer, but should you really be using the hack?
> LogisticRegression and NaiveBayes in ml work without setting the number of
> classes.
>
> Thanks for any pointers!
> Kristina
>
> My code:
>
> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>
> case class Record(label:Double,
> features:org.apache.spark.mllib.linalg.Vector)
>
> val df = sc.parallelize(Seq( Record(0.0, Vectors.dense(1.0, 0.0) ),
> Record(0.0, Vectors.dense(1.1, 0.0) ),
> Record(0.0, Vectors.dense(1.2, 0.0) ),
> Record(1.0, Vectors.dense(0.0, 1.2) ),
> Record(1.0, Vectors.dense(0.0, 1.3) ),
> Record(1.0, Vectors.dense(0.0, 1.7) ))
>).toDF()
>
> val rf = new RandomForestClassifier()
> val rfmodel = rf.fit(df)
>
> And the error is:
>
> scala> val rfmodel = rf.fit(df)
> java.lang.IllegalArgumentException: RandomForestClassifier was given input
> with invalid label column label, without the number of classes specified.
> See StringIndexer.
> at
> org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:87)
> at
> org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:42)
> at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:31)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36)
>
>


Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-30 Thread Ted Yu
bq. have tried these settings with the hbase protocol jar, to no avail

In that case, HBaseZeroCopyByteString is contained in hbase-protocol.jar.
In HBaseZeroCopyByteString , you can see:

package com.google.protobuf;  // This is a lie.

If protobuf jar is loaded ahead of hbase-protocol.jar, things start to get
interesting ...

On Tue, Sep 29, 2015 at 6:12 PM, Dmitry Goldenberg  wrote:

> Ted, I think I have tried these settings with the hbase protocol jar, to
> no avail.
>
> I'm going to see if I can try and use these with this SolrException issue
> though it now may be harder to reproduce it. Thanks for the suggestion.
>
> On Tue, Sep 29, 2015 at 8:03 PM, Ted Yu  wrote:
>
>> Have you tried the following ?
>> --conf spark.driver.userClassPathFirst=true --conf spark.executor.
>> userClassPathFirst=true
>>
>> On Tue, Sep 29, 2015 at 4:38 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Release of Spark: 1.5.0.
>>>
>>> Command line invokation:
>>>
>>> ACME_INGEST_HOME=/mnt/acme/acme-ingest
>>> ACME_INGEST_VERSION=0.0.1-SNAPSHOT
>>> ACME_BATCH_DURATION_MILLIS=5000
>>> SPARK_MASTER_URL=spark://data1:7077
>>> JAVA_OPTIONS="-Dspark.streaming.kafka.maxRatePerPartition=1000"
>>> JAVA_OPTIONS="$JAVA_OPTIONS -Dspark.executor.memory=2g"
>>>
>>> $SPARK_HOME/bin/spark-submit \
>>> --driver-class-path  $ACME_INGEST_HOME \
>>> --driver-java-options "$JAVA_OPTIONS" \
>>> --class
>>> "com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver" \
>>> --master $SPARK_MASTER_URL  \
>>> --conf
>>> "spark.executor.extraClassPath=$ACME_INGEST_HOME/conf:$ACME_INGEST_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar"
>>> \
>>>
>>> $ACME_INGEST_HOME/lib/acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar \
>>> -brokerlist $METADATA_BROKER_LIST \
>>> -topic acme.topic1 \
>>> -autooffsetreset largest \
>>> -batchdurationmillis $ACME_BATCH_DURATION_MILLIS \
>>> -appname Acme.App1 \
>>> -checkpointdir file://$SPARK_HOME/acme/checkpoint-acme-app1
>>> Note that SolrException is definitely in our consumer jar
>>> acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar which gets deployed to
>>> $ACME_INGEST_HOME.
>>>
>>> For the extraClassPath on the executors, we've got additionally
>>> hbase-protocol-0.98.9-hadoop2.jar: we're using Apache Phoenix from the
>>> Spark jobs to communicate with HBase.  The only way to force Phoenix to
>>> successfully communicate with HBase was to have that JAR explicitly added
>>> to the executor classpath regardless of the fact that the contents of the
>>> hbase-protocol hadoop jar get rolled up into the consumer jar at build time.
>>>
>>> I'm starting to wonder whether there's some class loading pattern here
>>> where some classes may not get loaded out of the consumer jar and therefore
>>> have to have their respective jars added to the executor extraClassPath?
>>>
>>> Or is this a serialization problem for SolrException as Divya
>>> Ravichandran suggested?
>>>
>>>
>>>
>>>
>>> On Tue, Sep 29, 2015 at 6:16 PM, Ted Yu  wrote:
>>>
 Mind providing a bit more information:

 release of Spark
 command line for running Spark job

 Cheers

 On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> We're seeing this occasionally. Granted, this was caused by a wrinkle
> in the Solr schema but this bubbled up all the way in Spark and caused job
> failures.
>
> I just checked and SolrException class is actually in the consumer job
> jar we use.  Is there any reason why Spark cannot find the SolrException
> class?
>
> 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception
> could not be deserialized
> java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> 

RE: Need for advice - performance improvement and out of memory resolution

2015-09-30 Thread Ewan Leith
Try reducing the number of workers to 2, and increasing their memory up to 6GB.

However I've seen mention of a bug in the pyspark API for when calling head() 
on a dataframe in spark 1.5.0 and 1.4, it's got a big performance hit.

https://issues.apache.org/jira/browse/SPARK-10731

It's fixed in spark 1.5.1 which was released yesterday, so maybe try upgrading.

Ewan


-Original Message-
From: camelia [mailto:came...@chalmers.se] 
Sent: 30 September 2015 10:51
To: user@spark.apache.org
Subject: Need for advice - performance improvement and out of memory resolution

Hello,

I am working on a machine learning project, currently using
spark-1.4.1-bin-hadoop2.6 in local mode on a laptop (Ubuntu 14.04 OS running on 
a Dell laptop with i7-5600@2.6 GHz * 4 cores, 15.6 GB RAM). I also mention 
working in Python from an IPython notebook.
 

I face the following problem: when working with a Dataframe created from a CSV 
file (2.7 GB) with schema inferred (1900 features), the time it takes for Spark 
to count the 145231 rows is 3:30 minutes using 4 cores. Longer times are 
recorder for computing one feature's statistics, for example:


START AT: 2015-09-21
08:56:41.136947
 
+---+--+
|summary|  VAR_1933|
+---+--+
|  count|145231|
|   mean| 8849.839111484464|
| stddev|3175.7863998269395|
|min| 0|
|max|  |
+---+--+

 
FINISH AT: 2015-09-21
09:02:49.452260



So, my first question would be what configuration parameters to set in order to 
improve this performance?

I tried some explicit configuration in the IPython notebook, but specifying 
resources explicitly when creating the Spark configuration resulted in worse 
performance; I mean :

config =
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars",
jar_path)

worked twice faster than:

config =
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars",
jar_path).set("spark.driver.memory", "2g").set("spark.python.worker.memory
", "3g")





Secondly, when I do the one hot encoding (I tried two different ways of keeping 
results) I don't arrive at showing the head(1) of the resulted dataframe. We 
have the function :

def OHE_transform(categ_feat, df_old):
outputcolname = categ_feat + "_ohe_index"
outputcolvect = categ_feat + "_ohe_vector"
stringIndexer = StringIndexer(inputCol=categ_feat,
outputCol=outputcolname)
indexed = stringIndexer.fit(df_old).transform(df_old)
encoder = OneHotEncoder(inputCol=outputcolname, outputCol=outputcolvect)
encoded = encoder.transform(indexed)
return encoded


The two manners for keeping results are depicted below:

1)

result = OHE_transform(list_top_feat[0], df_top_categ) for item in 
list_top_feat[1:]:
result = OHE_transform(item, result)
result.head(1)


2)

df_result = OHE_transform("VAR_A", df_top_categ)
df_result_1 = OHE_transform("VAR_B", df_result)
df_result_2 = OHE_transform("VAR_C", df_result_1) ...
df_result_12 = OHE_transform("VAR_X", df_result_11)
df_result_12.head(1)

In the first approach, at the third iteration (in the for loop), when it was 
supposed to print the head(1), the IPython notebook  remained in the state 
"Kernel busy" for several hours and then I interrupted the kernel.
The second approach managed to go through all transformations (please note that 
here I eliminated the intermediary prints of the head(1)), but it gave an "out 
of memory" error at the only (final result) head(1),  that I paste below :

===

 

df_result_12.head(1)

---
Py4JJavaError Traceback (most recent call last)
 in ()
> 1 df_result_12.head(1)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
head(self, n)
649 rs = self.head(1)
650 return rs[0] if rs else None
--> 651 return self.take(n)
652 
653 @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
take(self, num)
305 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
306 """
--> 307 return self.limit(num).collect()
308 
309 @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
collect(self)
279 """
280 with SCCallSiteSync(self._sc) as css:
--> 281 port =
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
282 rs = list(_load_from_socket(port,
BatchedSerializer(PickleSerializer(
283 cls = _create_cls(self.schema)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, 

Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-30 Thread Dmitry Goldenberg
I believe I've had trouble with --conf spark.driver.userClassPathFirst=true
--conf spark.executor.userClassPathFirst=true before, so these might not
work...

I was thinking of trying to add the solr4j jar to
spark.executor.extraClassPath...

On Wed, Sep 30, 2015 at 12:01 PM, Ted Yu  wrote:

> bq. have tried these settings with the hbase protocol jar, to no avail
>
> In that case, HBaseZeroCopyByteString is contained in hbase-protocol.jar.
> In HBaseZeroCopyByteString , you can see:
>
> package com.google.protobuf;  // This is a lie.
>
> If protobuf jar is loaded ahead of hbase-protocol.jar, things start to get
> interesting ...
>
> On Tue, Sep 29, 2015 at 6:12 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Ted, I think I have tried these settings with the hbase protocol jar, to
>> no avail.
>>
>> I'm going to see if I can try and use these with this SolrException issue
>> though it now may be harder to reproduce it. Thanks for the suggestion.
>>
>> On Tue, Sep 29, 2015 at 8:03 PM, Ted Yu  wrote:
>>
>>> Have you tried the following ?
>>> --conf spark.driver.userClassPathFirst=true --conf spark.executor.
>>> userClassPathFirst=true
>>>
>>> On Tue, Sep 29, 2015 at 4:38 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 Release of Spark: 1.5.0.

 Command line invokation:

 ACME_INGEST_HOME=/mnt/acme/acme-ingest
 ACME_INGEST_VERSION=0.0.1-SNAPSHOT
 ACME_BATCH_DURATION_MILLIS=5000
 SPARK_MASTER_URL=spark://data1:7077
 JAVA_OPTIONS="-Dspark.streaming.kafka.maxRatePerPartition=1000"
 JAVA_OPTIONS="$JAVA_OPTIONS -Dspark.executor.memory=2g"

 $SPARK_HOME/bin/spark-submit \
 --driver-class-path  $ACME_INGEST_HOME \
 --driver-java-options "$JAVA_OPTIONS" \
 --class
 "com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver" \
 --master $SPARK_MASTER_URL  \
 --conf
 "spark.executor.extraClassPath=$ACME_INGEST_HOME/conf:$ACME_INGEST_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar"
 \

 $ACME_INGEST_HOME/lib/acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar \
 -brokerlist $METADATA_BROKER_LIST \
 -topic acme.topic1 \
 -autooffsetreset largest \
 -batchdurationmillis $ACME_BATCH_DURATION_MILLIS \
 -appname Acme.App1 \
 -checkpointdir file://$SPARK_HOME/acme/checkpoint-acme-app1
 Note that SolrException is definitely in our consumer jar
 acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar which gets deployed to
 $ACME_INGEST_HOME.

 For the extraClassPath on the executors, we've got additionally
 hbase-protocol-0.98.9-hadoop2.jar: we're using Apache Phoenix from the
 Spark jobs to communicate with HBase.  The only way to force Phoenix to
 successfully communicate with HBase was to have that JAR explicitly added
 to the executor classpath regardless of the fact that the contents of the
 hbase-protocol hadoop jar get rolled up into the consumer jar at build 
 time.

 I'm starting to wonder whether there's some class loading pattern here
 where some classes may not get loaded out of the consumer jar and therefore
 have to have their respective jars added to the executor extraClassPath?

 Or is this a serialization problem for SolrException as Divya
 Ravichandran suggested?




 On Tue, Sep 29, 2015 at 6:16 PM, Ted Yu  wrote:

> Mind providing a bit more information:
>
> release of Spark
> command line for running Spark job
>
> Cheers
>
> On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> We're seeing this occasionally. Granted, this was caused by a wrinkle
>> in the Solr schema but this bubbled up all the way in Spark and caused 
>> job
>> failures.
>>
>> I just checked and SolrException class is actually in the consumer
>> job jar we use.  Is there any reason why Spark cannot find the
>> SolrException class?
>>
>> 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception
>> could not be deserialized
>> java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>> at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>> at
>> 

Re: UnknownHostException with Mesos and custom Jar

2015-09-30 Thread Akhil Das
That's strange, for some reason your hadoop configurations are not picked
up by spark.

Thanks
Best Regards

On Wed, Sep 30, 2015 at 9:11 PM, Stephen Hankinson 
wrote:

> When I use hdfs://affinio/tmp/Input it gives the same error about
> UnknownHostException affinio.
>
> However, from the command line I can run hdfs dfs -ls /tmp/Input or hdfs
> dfs -ls hdfs://affinio/tmp/Input and they work correctly.
>
> See more details here:
> http://stackoverflow.com/questions/32833860/unknownhostexception-with-mesos-spark-and-custom-jar
>
> Stephen Hankinson, P. Eng.
> CTO
> Affinio Inc.
> 301 - 211 Horseshoe Lake Dr.
> Halifax, Nova Scotia, Canada
> B3S 0B9
>
> http://www.affinio.com
>
> On Wed, Sep 30, 2015 at 4:21 AM, Akhil Das 
> wrote:
>
>> Can you try replacing your code with the hdfs uri? like:
>>
>> sc.textFile("hdfs://...").collect().foreach(println)
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Sep 29, 2015 at 1:45 AM, Stephen Hankinson 
>> wrote:
>>
>>> Hi,
>>>
>>> Wondering if anyone can help me with the issue I am having.
>>>
>>> I am receiving an UnknownHostException when running a custom jar with
>>> Spark on Mesos. The issue does not happen when running spark-shell.
>>>
>>> My spark-env.sh contains the following:
>>>
>>> export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos.so
>>>
>>> export HADOOP_CONF_DIR=/hadoop-2.7.1/etc/hadoop/
>>>
>>> My spark-defaults.conf contains the following:
>>>
>>> spark.master   mesos://zk://172.31.0.81:2181,
>>> 172.31.16.81:2181,172.31.32.81:2181/mesos
>>>
>>> spark.mesos.executor.home  /spark-1.5.0-bin-hadoop2.6/
>>>
>>> Starting spark-shell as follows and running the following line works
>>> correctly:
>>>
>>> /spark-1.5.0-bin-hadoop2.6/bin/spark-shell
>>>
>>> sc.textFile("/tmp/Input").collect.foreach(println)
>>>
>>> 15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(88528)
>>> called with curMem=0, maxMem=556038881
>>>
>>> 15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_0 stored as
>>> values in memory (estimated size 86.5 KB, free 530.2 MB)
>>>
>>> 15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(20236)
>>> called with curMem=88528, maxMem=556038881
>>>
>>> 15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_0_piece0
>>> stored as bytes in memory (estimated size 19.8 KB, free 530.2 MB)
>>>
>>> 15/09/28 20:04:49 INFO storage.BlockManagerInfo: Added
>>> broadcast_0_piece0 in memory on 172.31.21.104:49048 (size: 19.8 KB,
>>> free: 530.3 MB)
>>>
>>> 15/09/28 20:04:49 INFO spark.SparkContext: Created broadcast 0 from
>>> textFile at :22
>>>
>>> 15/09/28 20:04:49 INFO mapred.FileInputFormat: Total input paths to
>>> process : 1
>>>
>>> 15/09/28 20:04:49 INFO spark.SparkContext: Starting job: collect at
>>> :22
>>>
>>> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Got job 0 (collect at
>>> :22) with 3 output partitions
>>>
>>> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Final stage: ResultStage
>>> 0(collect at :22)
>>>
>>> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Parents of final stage:
>>> List()
>>>
>>> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Missing parents: List()
>>>
>>> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Submitting ResultStage 0
>>> (MapPartitionsRDD[1] at textFile at :22), which has no missing
>>> parents
>>>
>>> 15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(3120) called
>>> with curMem=108764, maxMem=556038881
>>>
>>> 15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_1 stored as
>>> values in memory (estimated size 3.0 KB, free 530.2 MB)
>>>
>>> 15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(1784) called
>>> with curMem=111884, maxMem=556038881
>>>
>>> 15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_1_piece0
>>> stored as bytes in memory (estimated size 1784.0 B, free 530.2 MB)
>>>
>>> 15/09/28 20:04:49 INFO storage.BlockManagerInfo: Added
>>> broadcast_1_piece0 in memory on 172.31.21.104:49048 (size: 1784.0 B,
>>> free: 530.3 MB)
>>>
>>> 15/09/28 20:04:49 INFO spark.SparkContext: Created broadcast 1 from
>>> broadcast at DAGScheduler.scala:861
>>>
>>> 15/09/28 20:04:49 INFO scheduler.DAGScheduler: Submitting 3 missing
>>> tasks from ResultStage 0 (MapPartitionsRDD[1] at textFile at :22)
>>>
>>> 15/09/28 20:04:49 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
>>> with 3 tasks
>>>
>>> 15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 0.0 in
>>> stage 0.0 (TID 0, ip-172-31-37-82.us-west-2.compute.internal, NODE_LOCAL,
>>> 2142 bytes)
>>>
>>> 15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 1.0 in
>>> stage 0.0 (TID 1, ip-172-31-21-104.us-west-2.compute.internal, NODE_LOCAL,
>>> 2142 bytes)
>>>
>>> 15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 2.0 in
>>> stage 0.0 (TID 2, ip-172-31-4-4.us-west-2.compute.internal, NODE_LOCAL,
>>> 2142 bytes)
>>>
>>> 15/09/28 20:04:52 INFO 

New spark meetup

2015-09-30 Thread Yogesh Mahajan
Hi,

Can you please get this new spark meetup listed on the spark community page -
http://spark.apache.org/community.html#events

Here is a link for the meetup in Pune, India  :  
http://www.meetup.com/Pune-Apache-Spark-Meetup/

Thanks,
Yogesh

Sent from my iPhone

Re: unsubscribe

2015-09-30 Thread Richard Hillegas

Hi Sukesh,

To unsubscribe from the dev list, please send a message to
dev-unsubscr...@spark.apache.org. To unsubscribe from the user list, please
send a message user-unsubscr...@spark.apache.org. Please see:
http://spark.apache.org/community.html#mailing-lists.

Thanks,
-Rick

sukesh kumar  wrote on 09/28/2015 11:39:01 PM:

> From: sukesh kumar 
> To: "user@spark.apache.org" ,
> "d...@spark.apache.org" 
> Date: 09/28/2015 11:39 PM
> Subject: unsubscribe
>
> unsubscribe
>
> --
> Thanks & Best Regards
> Sukesh Kumar