RE: what is the optimized way to combine multiple dataframes into one dataframe ?

2016-11-15 Thread Shreya Agarwal
If you are reading all these datasets from files in persistent storage, 
functions like sc.textFile can take folders/patterns as input and read all of 
the files matching into the same RDD. Then you can convert it to a dataframe.

When you say it is time consuming with union, how are you measuring that? Did 
you try having all of them in one DF in comparison to having them broken down? 
Are you seeing a non-linear slowdown in operations after union with linear 
increase in data size?
Sent from my Windows 10 phone

From: Devi P.V
Sent: Tuesday, November 15, 2016 11:06 PM
To: user @spark
Subject: what is the optimized way to combine multiple dataframes into one 
dataframe ?

Hi all,

I have 4 data frames with three columns,

client_id,product_id,interest

I want to combine these 4 dataframes into one dataframe.I used union like 
following

df1.union(df2).union(df3).union(df4)

But it is time consuming for bigdata.what is the optimized way for doing this 
using spark 2.0 & scala


Thanks


RE: AVRO File size when caching in-memory

2016-11-15 Thread Shreya Agarwal
(Adding user@spark back to the discussion)

Well, the CSV vs AVRO might be simpler to explain. CSV has a lot of scope for 
compression. On the other hand avro and parquet are already compressed and just 
store extra schema info, afaik. Avro and parquet are both going to make your 
data smaller, parquet through compressed columnar storage, and avro through its 
binary data format.

Next, talking about the 62kb becoming 1224kb. I actually do not see such a 
massive blow up. The avro you shared is 28kb on my system and becomes 53.7kb 
when cached in memory deserialized and 52.9kb when cached In memory serialized. 
Exact same numbers with parquet as well. This is expected behavior, if I am not 
wrong.

In fact, now that I think about it, even larger blow ups might be valid, since 
your data must have been deserialized from the compressed avro format, making 
it bigger. The order of magnitude of difference in size would depend on the 
type of data you have and how well it was compressable.

The purpose of these formats is to store data to persistent storage in a way 
that's faster to read from, not to reduce cache-memory usage.

Maybe others here have more info to share.

Regards,
Shreya

Sent from my Windows 10 phone

From: Prithish
Sent: Tuesday, November 15, 2016 11:04 PM
To: Shreya Agarwal
Subject: Re: AVRO File size when caching in-memory

I did another test and noting my observations here. These were done with the 
same data in avro and csv formats.

In AVRO, the file size on disk was 62kb and after caching, the in-memory size 
is 1224kb
In CSV, the file size on disk was 690kb and after caching, the in-memory size 
is 290kb

I'm guessing that the spark caching is not able to compress when the source is 
avro. Not sure if this is just my immature conclusion. Waiting to hear your 
observation.

On Wed, Nov 16, 2016 at 12:14 PM, Prithish 
> wrote:
Thanks for your response.

I have attached the code (that I ran using the Spark-shell) as well as a sample 
avro file. After you run this code, the data is cached in memory and you can go 
to the "storage" tab on the Spark-ui (localhost:4040) and see the size it uses. 
In this example the size is small, but in my actual scenario, the source file 
size is 30GB and the in-memory size comes to around 800GB. I am trying to 
understand if this is expected when using avro or not.

On Wed, Nov 16, 2016 at 10:37 AM, Shreya Agarwal 
> wrote:
I haven't used Avro ever. But if you can send over a quick sample code, I can 
run and see if I repro it and maybe debug.

From: Prithish [mailto:prith...@gmail.com]
Sent: Tuesday, November 15, 2016 8:44 PM
To: J?rn Franke >
Cc: User >
Subject: Re: AVRO File size when caching in-memory

Anyone?

On Tue, Nov 15, 2016 at 10:45 AM, Prithish 
> wrote:
I am using 2.0.1 and databricks avro library 3.0.1. I am running this on the 
latest AWS EMR release.

On Mon, Nov 14, 2016 at 3:06 PM, J?rn Franke 
> wrote:
spark version? Are you using tungsten?

> On 14 Nov 2016, at 10:05, Prithish 
> > wrote:
>
> Can someone please explain why this happens?
>
> When I read a 600kb AVRO file and cache this in memory (using cacheTable), it 
> shows up as 11mb (storage tab in Spark UI). I have tried this with different 
> file sizes, and the size in-memory is always proportionate. I thought Spark 
> compresses when using cacheTable.






what is the optimized way to combine multiple dataframes into one dataframe ?

2016-11-15 Thread Devi P.V
Hi all,

I have 4 data frames with three columns,

client_id,product_id,interest

I want to combine these 4 dataframes into one dataframe.I used union like
following

df1.union(df2).union(df3).union(df4)

But it is time consuming for bigdata.what is the optimized way for doing
this using spark 2.0 & scala


Thanks


Re: AVRO File size when caching in-memory

2016-11-15 Thread Prithish
Anyone?

On Tue, Nov 15, 2016 at 10:45 AM, Prithish  wrote:

> I am using 2.0.1 and databricks avro library 3.0.1. I am running this on
> the latest AWS EMR release.
>
> On Mon, Nov 14, 2016 at 3:06 PM, Jörn Franke  wrote:
>
>> spark version? Are you using tungsten?
>>
>> > On 14 Nov 2016, at 10:05, Prithish  wrote:
>> >
>> > Can someone please explain why this happens?
>> >
>> > When I read a 600kb AVRO file and cache this in memory (using
>> cacheTable), it shows up as 11mb (storage tab in Spark UI). I have tried
>> this with different file sizes, and the size in-memory is always
>> proportionate. I thought Spark compresses when using cacheTable.
>>
>
>


Re: Joining to a large, pre-sorted file

2016-11-15 Thread Rohit Verma
You can try coalesce on join statement.
val result = master.join(transaction,”key”). coalesce(# number of partitions in 
master)
On Nov 15, 2016, at 8:07 PM, Stuart White 
> wrote:

It seems that the number of files could possibly get out of hand using this 
approach.

For example, in the job that buckets and writes master, assuming we use the 
default number of shuffle partitions (200), and assuming that in the next job 
(the job where we join to transaction), we're also going to want to use 200 
partitions, that means master would be written to disk in 40,000 files (200 
partitions, each writing 200 bucket files).  Am I mistaken?

Is there some way to avoid this explosion of the number of files?  Or is this 
just an unavoidable side-effect of Spark's bucketing implementation?

Thanks again!

On Sun, Nov 13, 2016 at 9:24 AM, Silvio Fiorito 
> wrote:

Hi Stuart,

Yes that's the query plan but if you take a look at my screenshot it skips the 
first stage since the datasets are co-partitioned.

Thanks,
Silvio


From: Stuart White >
Sent: Saturday, November 12, 2016 11:20:28 AM
To: Silvio Fiorito
Cc: user@spark.apache.org
Subject: Re: Joining to a large, pre-sorted file

Hi Silvio,

Thanks very much for the response!

I'm pretty new at reading explain plans, so maybe I'm misunderstanding what I'm 
seeing.

Remember my goal is to sort master, write it out, later read it back in and 
have Spark "remember" that it's sorted, so I can do joins and Spark will not 
sort it again.

Looking at the explain plan for the example job you provided, it looks to me 
like Spark is re-sorted master after reading it back in.  See the attachment 
for the Sort step I'm referring to.

Am I misunderstanding the explain plan?

Thanks again!

On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito 
> wrote:

Hi Stuart,



You don’t need the sortBy or sortWithinPartitions.



https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/6861830365114179/latest.html





This is what the job should look like:





On 11/12/16, 8:40 AM, "Stuart White" 
> wrote:



Thanks for the reply.



I understand that I need to use bucketBy() to write my master file,

but I still can't seem to make it work as expected.  Here's a code

example for how I'm writing my master file:



Range(0, 100)

  .map(i => (i, s"master_$i"))

  .toDF("key", "value")

  .write

  .format("json")

  .bucketBy(3, "key")

  .sortBy("key")

  .saveAsTable("master")



And here's how I'm reading it later and attempting to join to a

transaction dataset:



val master = spark

  .read

  .format("json")

  .json("spark-warehouse/master")

  .cache



val transaction = Range(0, 100)

  .map(i => (i, s"transaction_$i"))

  .toDF("key", "value")

  .repartition(3, 'key)

  .sortWithinPartitions('key)

  .cache



val results = master.join(transaction, "key")



When I call results.explain(), I see that it is sorting both datasets

before sending them through SortMergeJoin.



== Physical Plan ==

*Project [key#0L, value#1, value#53]

+- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner

  :- *Sort [key#0L ASC], false, 0

   :  +- Exchange hashpartitioning(key#0L, 200)

   : +- *Filter isnotnull(key#0L)

   :+- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]

   :   :  +- InMemoryRelation [key#0L, value#1], true, 1,

StorageLevel(disk, memory, deserialized, 1 replicas)

   :   : :  +- *Scan json [key#0L,value#1] Format: JSON,

InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],

PushedFilters: [], ReadSchema: struct

   +- *Sort [cast(key#52 as bigint) ASC], false, 0

  +- Exchange hashpartitioning(cast(key#52 as bigint), 200)

 +- InMemoryTableScan [key#52, value#53]

:  +- InMemoryRelation [key#52, value#53], true, 1,

StorageLevel(disk, memory, deserialized, 1 replicas)

: :  +- *Sort [key#52 ASC], false, 0

: : +- Exchange hashpartitioning(key#52, 3)

: :+- LocalTableScan [key#52, value#53]



Here are my thoughts:

1. I think I'm probably reading the master file back into memory

incorrectly.  I think maybe I should be reading it as a Hive table

rather than just a plain json file, but I can't seem to figure out how

to do that.

2. I don't understand 

Re: Does the delegator map task of SparkLauncher need to stay alive until Spark job finishes ?

2016-11-15 Thread Elkhan Dadashov
Thanks for the clarification, Marcelo.

On Tue, Nov 15, 2016 at 6:20 PM Marcelo Vanzin  wrote:

> On Tue, Nov 15, 2016 at 5:57 PM, Elkhan Dadashov 
> wrote:
> > This is confusing in the sense that, the client needs to stay alive for
> > Spark Job to finish successfully.
> >
> > Actually the client can die  or finish (in Yarn-cluster mode), and the
> spark
> > job will successfully finish.
>
> That's an internal class, and you're looking at an internal javadoc
> that describes how the app handle works. For the app handle to be
> updated, the "client" (i.e. the sub process) needs to stay alive. So
> the javadoc is correct. It has nothing to do with whether the
> application succeeds or not.
>
>
> --
> Marcelo
>


Re: Problem submitting a spark job using yarn-client as master

2016-11-15 Thread Rohit Verma
you can set hdfs as defaults,

sparksession.sparkContext().hadoopConfiguration().set("fs.defaultFS", 
“hdfs://master_node:8020”);

Regards
Rohit

On Nov 16, 2016, at 3:15 AM, David Robison 
> wrote:

I am trying to submit a spark job through the yarn-client master setting. The 
job gets created and submitted to the clients but immediately errors out. Here 
is the relevant portion of the log:

15:39:37,385 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Requesting a new application from cluster with 1 NodeManagers
15:39:37,397 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Verifying our application has not requested more than the maximum memory 
capability of the cluster (4608 MB per container)
15:39:37,398 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) Will 
allocate AM container, with 896 MB memory including 384 MB overhead
15:39:37,399 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up container launch context for our AM
15:39:37,403 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up the launch environment for our AM container
15:39:37,427 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Preparing resources for our AM container
15:39:37,845 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/opt/wildfly/modules/org/apache/hadoop/client/main/spark-yarn_2.10-1.6.2.jar
15:39:38,050 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
15:39:38,102 INFO  [org.apache.spark.SecurityManager] (default task-1) Changing 
view acls to: wildfly,hdfs
15:39:38,105 INFO  [org.apache.spark.SecurityManager] (default task-1) Changing 
modify acls to: wildfly,hdfs
15:39:38,105 INFO  [org.apache.spark.SecurityManager] (default task-1) 
SecurityManager: authentication disabled; ui acls disabled; users with view 
permissions: Set(wildfly, hdfs); users with modify permissions: Set(wildfly, 
hdfs)
15:39:38,138 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Submitting application 5 to ResourceManager
15:39:38,256 INFO  [org.apache.hadoop.yarn.client.api.impl.YarnClientImpl] 
(default task-1) Submitted application application_1479240217825_0005
15:39:39,269 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:39,279 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1)
 client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1479242378159
final status: UNDEFINED
tracking URL: 
http://vb1.localdomain:8088/proxy/application_1479240217825_0005/
user: hdfs
15:39:40,285 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:41,290 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:42,295 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: FAILED)
15:39:42,295 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1)
 client token: N/A
diagnostics: Application application_1479240217825_0005 
failed 2 times due to AM Container for appattempt_1479240217825_0005_02 
exited with  exitCode: -1000
For more detailed output, check application tracking 
page:http://vb1.localdomain:8088/cluster/app/application_1479240217825_0005Then,
 click on links to logs of each attempt.
Diagnostics: File 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
 does not exist
java.io.FileNotFoundException: File 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
 does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)


Notice that the file __spark_conf__1435451360463636119.zip is not copied 
because it exists, I believe on the hdfs. However when the client goes to fetch 
it, it is reporting that it does not exist, probably because it is trying to 
get it 

Re: Does the delegator map task of SparkLauncher need to stay alive until Spark job finishes ?

2016-11-15 Thread Marcelo Vanzin
On Tue, Nov 15, 2016 at 5:57 PM, Elkhan Dadashov  wrote:
> This is confusing in the sense that, the client needs to stay alive for
> Spark Job to finish successfully.
>
> Actually the client can die  or finish (in Yarn-cluster mode), and the spark
> job will successfully finish.

That's an internal class, and you're looking at an internal javadoc
that describes how the app handle works. For the app handle to be
updated, the "client" (i.e. the sub process) needs to stay alive. So
the javadoc is correct. It has nothing to do with whether the
application succeeds or not.


-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Does the delegator map task of SparkLauncher need to stay alive until Spark job finishes ?

2016-11-15 Thread Elkhan Dadashov
Hi Marcelo,

This part of the JaaDoc is confusing:

https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java


"
* In *cluster mode*, this means that the client that launches the
* application *must remain alive for the duration of the application* (or
until the app handle is
* disconnected).
*/
class LauncherServer implements Closeable {
"
This is confusing in the sense that, the client needs to stay alive for
Spark Job to finish successfully.

Actually the client can die  or finish (in Yarn-cluster mode), and the
spark job will successfully finish.

Yeah, the client needs to stay alive until appHandle state is Submitted (or
maybe Running), but not until Final state, unless you want to query the
state of Spark app using appHandle.

I still do not get the meaning of the comment above.

Thanks.


On Tue, Oct 18, 2016 at 3:07 PM Marcelo Vanzin  wrote:

> On Tue, Oct 18, 2016 at 3:01 PM, Elkhan Dadashov 
> wrote:
> > Does my map task need to wait until Spark job finishes ?
>
> No...
>
> > Or is there any way, my map task finishes after launching Spark job, and
> I
> > can still query and get status of Spark job outside of map task (or
> failure
> > reason, if it has failed) ? (maybe by querying Spark job id ?)
>
> ...but if the SparkLauncher handle goes away, then you lose the
> ability to track the app's state, unless you talk directly to the
> cluster manager.
>
> > I guess also if i want my Spark job to be killed, if corresponding
> delegator
> > map task is killed, that means my map task needs to stay alive, so i
> still
> > have SparkAppHandle reference ?
>
> Correct, unless you talk directly to the cluster manager.
>
> --
> Marcelo
>


Re: Spark-xml - OutOfMemoryError: Requested array size exceeds VM limit

2016-11-15 Thread Arun Patel
Thanks for the quick response.

Its a single XML file and I am using a top level rowTag.  So, it creates
only one row in a Dataframe with 5 columns. One of these columns will
contain most of the data as StructType.  Is there a limitation to store
data in a cell of a Dataframe?

I will check with new version and try to use different rowTags and increase
executor-memory tomorrow. I will open a new issue as well.



On Tue, Nov 15, 2016 at 7:52 PM, Hyukjin Kwon  wrote:

> Hi Arun,
>
>
> I have few questions.
>
> Dose your XML file have like few huge documents? In this case of a row
> having a huge size like (like 500MB), it would consume a lot of memory
>
> becuase at least it should hold a row to iterate if I remember correctly.
> I remember this happened to me before while processing a huge record for
> test purpose.
>
>
> How about trying to increase --executor-memory?
>
>
> Also, you could try to select only few fields to prune the data with the
> latest version just to doubly sure if you don't mind?.
>
>
> Lastly, do you mind if I ask to open an issue in https://github.com/
> databricks/spark-xml/issues if you still face this problem?
>
> I will try to take a look at my best.
>
>
> Thank you.
>
>
> 2016-11-16 9:12 GMT+09:00 Arun Patel :
>
>> I am trying to read an XML file which is 1GB is size.  I am getting an
>> error 'java.lang.OutOfMemoryError: Requested array size exceeds VM
>> limit' after reading 7 partitions in local mode.  In Yarn mode, it
>> throws 'java.lang.OutOfMemoryError: Java heap space' error after reading
>> 3 partitions.
>>
>> Any suggestion?
>>
>> PySpark Shell Command:pyspark --master local[4] --driver-memory 3G
>> --jars / tmp/spark-xml_2.10-0.3.3.jar
>>
>>
>>
>> Dataframe Creation Command:   df = sqlContext.read.format('com.da
>> tabricks.spark.xml').options(rowTag='GGL').load('GGL_1.2G.xml')
>>
>>
>>
>> 16/11/15 18:27:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0
>> (TID 1) in 25978 ms on localhost (1/10)
>>
>> 16/11/15 18:27:04 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:268435456+134217728
>>
>> 16/11/15 18:27:55 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2).
>> 2309 bytes result sent to driver
>>
>> 16/11/15 18:27:55 INFO TaskSetManager: Starting task 3.0 in stage 0.0
>> (TID 3, localhost, partition 3,ANY, 2266 bytes)
>>
>> 16/11/15 18:27:55 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
>>
>> 16/11/15 18:27:55 INFO TaskSetManager: Finished task 2.0 in stage 0.0
>> (TID 2) in 51001 ms on localhost (2/10)
>>
>> 16/11/15 18:27:55 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:402653184+134217728
>>
>> 16/11/15 18:28:19 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3).
>> 2309 bytes result sent to driver
>>
>> 16/11/15 18:28:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0
>> (TID 4, localhost, partition 4,ANY, 2266 bytes)
>>
>> 16/11/15 18:28:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
>>
>> 16/11/15 18:28:19 INFO TaskSetManager: Finished task 3.0 in stage 0.0
>> (TID 3) in 24336 ms on localhost (3/10)
>>
>> 16/11/15 18:28:19 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:536870912+134217728
>>
>> 16/11/15 18:28:40 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4).
>> 2309 bytes result sent to driver
>>
>> 16/11/15 18:28:40 INFO TaskSetManager: Starting task 5.0 in stage 0.0
>> (TID 5, localhost, partition 5,ANY, 2266 bytes)
>>
>> 16/11/15 18:28:40 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
>>
>> 16/11/15 18:28:40 INFO TaskSetManager: Finished task 4.0 in stage 0.0
>> (TID 4) in 20895 ms on localhost (4/10)
>>
>> 16/11/15 18:28:40 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:671088640+134217728
>>
>> 16/11/15 18:29:01 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5).
>> 2309 bytes result sent to driver
>>
>> 16/11/15 18:29:01 INFO TaskSetManager: Starting task 6.0 in stage 0.0
>> (TID 6, localhost, partition 6,ANY, 2266 bytes)
>>
>> 16/11/15 18:29:01 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
>>
>> 16/11/15 18:29:01 INFO TaskSetManager: Finished task 5.0 in stage 0.0
>> (TID 5) in 20793 ms on localhost (5/10)
>>
>> 16/11/15 18:29:01 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:805306368+134217728
>>
>> 16/11/15 18:29:22 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6).
>> 2309 bytes result sent to driver
>>
>> 16/11/15 18:29:22 INFO TaskSetManager: Starting task 7.0 in stage 0.0
>> (TID 7, localhost, partition 7,ANY, 2266 bytes)
>>
>> 16/11/15 18:29:22 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
>>
>> 16/11/15 18:29:22 INFO TaskSetManager: Finished task 6.0 in stage 0.0
>> (TID 6) in 21306 ms on localhost (6/10)
>>
>> 16/11/15 18:29:22 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:939524096+134217728
>>
>> 16/11/15 

Re: Spark-xml - OutOfMemoryError: Requested array size exceeds VM limit

2016-11-15 Thread Hyukjin Kwon
Hi Arun,


I have few questions.

Dose your XML file have like few huge documents? In this case of a row
having a huge size like (like 500MB), it would consume a lot of memory

becuase at least it should hold a row to iterate if I remember correctly. I
remember this happened to me before while processing a huge record for test
purpose.


How about trying to increase --executor-memory?


Also, you could try to select only few fields to prune the data with the
latest version just to doubly sure if you don't mind?.


Lastly, do you mind if I ask to open an issue in
https://github.com/databricks/spark-xml/issues if you still face this
problem?

I will try to take a look at my best.


Thank you.


2016-11-16 9:12 GMT+09:00 Arun Patel :

> I am trying to read an XML file which is 1GB is size.  I am getting an
> error 'java.lang.OutOfMemoryError: Requested array size exceeds VM limit'
> after reading 7 partitions in local mode.  In Yarn mode, it
> throws 'java.lang.OutOfMemoryError: Java heap space' error after reading
> 3 partitions.
>
> Any suggestion?
>
> PySpark Shell Command:pyspark --master local[4] --driver-memory 3G
> --jars / tmp/spark-xml_2.10-0.3.3.jar
>
>
>
> Dataframe Creation Command:   df = sqlContext.read.format('com.da
> tabricks.spark.xml').options(rowTag='GGL').load('GGL_1.2G.xml')
>
>
>
> 16/11/15 18:27:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
> 1) in 25978 ms on localhost (1/10)
>
> 16/11/15 18:27:04 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:268435456+134217728
>
> 16/11/15 18:27:55 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2).
> 2309 bytes result sent to driver
>
> 16/11/15 18:27:55 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 3, localhost, partition 3,ANY, 2266 bytes)
>
> 16/11/15 18:27:55 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
>
> 16/11/15 18:27:55 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID
> 2) in 51001 ms on localhost (2/10)
>
> 16/11/15 18:27:55 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:402653184+134217728
>
> 16/11/15 18:28:19 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3).
> 2309 bytes result sent to driver
>
> 16/11/15 18:28:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
> 4, localhost, partition 4,ANY, 2266 bytes)
>
> 16/11/15 18:28:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
>
> 16/11/15 18:28:19 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID
> 3) in 24336 ms on localhost (3/10)
>
> 16/11/15 18:28:19 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:536870912+134217728
>
> 16/11/15 18:28:40 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4).
> 2309 bytes result sent to driver
>
> 16/11/15 18:28:40 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
> 5, localhost, partition 5,ANY, 2266 bytes)
>
> 16/11/15 18:28:40 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
>
> 16/11/15 18:28:40 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID
> 4) in 20895 ms on localhost (4/10)
>
> 16/11/15 18:28:40 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:671088640+134217728
>
> 16/11/15 18:29:01 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5).
> 2309 bytes result sent to driver
>
> 16/11/15 18:29:01 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID
> 6, localhost, partition 6,ANY, 2266 bytes)
>
> 16/11/15 18:29:01 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
>
> 16/11/15 18:29:01 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID
> 5) in 20793 ms on localhost (5/10)
>
> 16/11/15 18:29:01 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:805306368+134217728
>
> 16/11/15 18:29:22 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6).
> 2309 bytes result sent to driver
>
> 16/11/15 18:29:22 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID
> 7, localhost, partition 7,ANY, 2266 bytes)
>
> 16/11/15 18:29:22 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
>
> 16/11/15 18:29:22 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID
> 6) in 21306 ms on localhost (6/10)
>
> 16/11/15 18:29:22 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:939524096+134217728
>
> 16/11/15 18:29:43 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7).
> 2309 bytes result sent to driver
>
> 16/11/15 18:29:43 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID
> 8, localhost, partition 8,ANY, 2266 bytes)
>
> 16/11/15 18:29:43 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
>
> 16/11/15 18:29:43 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID
> 7) in 21130 ms on localhost (7/10)
>
> 16/11/15 18:29:43 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:1073741824+134217728
>
> 16/11/15 18:29:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
>
> 

Spark-xml - OutOfMemoryError: Requested array size exceeds VM limit

2016-11-15 Thread Arun Patel
I am trying to read an XML file which is 1GB is size.  I am getting an
error 'java.lang.OutOfMemoryError: Requested array size exceeds VM limit'
after reading 7 partitions in local mode.  In Yarn mode, it
throws 'java.lang.OutOfMemoryError: Java heap space' error after reading 3
partitions.

Any suggestion?

PySpark Shell Command:pyspark --master local[4] --driver-memory 3G
--jars / tmp/spark-xml_2.10-0.3.3.jar



Dataframe Creation Command:   df = sqlContext.read.format('com.
databricks.spark.xml').options(rowTag='GGL').load('GGL_1.2G.xml')



16/11/15 18:27:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
1) in 25978 ms on localhost (1/10)

16/11/15 18:27:04 INFO NewHadoopRDD: Input split:
hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:268435456+134217728

16/11/15 18:27:55 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2).
2309 bytes result sent to driver

16/11/15 18:27:55 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
3, localhost, partition 3,ANY, 2266 bytes)

16/11/15 18:27:55 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)

16/11/15 18:27:55 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID
2) in 51001 ms on localhost (2/10)

16/11/15 18:27:55 INFO NewHadoopRDD: Input split:
hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:402653184+134217728

16/11/15 18:28:19 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3).
2309 bytes result sent to driver

16/11/15 18:28:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
4, localhost, partition 4,ANY, 2266 bytes)

16/11/15 18:28:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)

16/11/15 18:28:19 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID
3) in 24336 ms on localhost (3/10)

16/11/15 18:28:19 INFO NewHadoopRDD: Input split:
hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:536870912+134217728

16/11/15 18:28:40 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4).
2309 bytes result sent to driver

16/11/15 18:28:40 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
5, localhost, partition 5,ANY, 2266 bytes)

16/11/15 18:28:40 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)

16/11/15 18:28:40 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID
4) in 20895 ms on localhost (4/10)

16/11/15 18:28:40 INFO NewHadoopRDD: Input split:
hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:671088640+134217728

16/11/15 18:29:01 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5).
2309 bytes result sent to driver

16/11/15 18:29:01 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID
6, localhost, partition 6,ANY, 2266 bytes)

16/11/15 18:29:01 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)

16/11/15 18:29:01 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID
5) in 20793 ms on localhost (5/10)

16/11/15 18:29:01 INFO NewHadoopRDD: Input split:
hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:805306368+134217728

16/11/15 18:29:22 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6).
2309 bytes result sent to driver

16/11/15 18:29:22 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID
7, localhost, partition 7,ANY, 2266 bytes)

16/11/15 18:29:22 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)

16/11/15 18:29:22 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID
6) in 21306 ms on localhost (6/10)

16/11/15 18:29:22 INFO NewHadoopRDD: Input split:
hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:939524096+134217728

16/11/15 18:29:43 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7).
2309 bytes result sent to driver

16/11/15 18:29:43 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID
8, localhost, partition 8,ANY, 2266 bytes)

16/11/15 18:29:43 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)

16/11/15 18:29:43 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID
7) in 21130 ms on localhost (7/10)

16/11/15 18:29:43 INFO NewHadoopRDD: Input split:
hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:1073741824+134217728

16/11/15 18:29:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

java.lang.OutOfMemoryError: Requested array size exceeds VM limit

at java.util.Arrays.copyOf(Arrays.java:2271)

at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.
java:113)

at java.io.ByteArrayOutputStream.ensureCapacity(
ByteArrayOutputStream.java:93)

at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.
java:122)

at java.io.DataOutputStream.write(DataOutputStream.java:88)

at com.databricks.spark.xml.XmlRecordReader.
readUntilMatch(XmlInputFormat.scala:188)

at com.databricks.spark.xml.XmlRecordReader.next(
XmlInputFormat.scala:156)

at com.databricks.spark.xml.XmlRecordReader.nextKeyValue(
XmlInputFormat.scala:141)

at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(
NewHadoopRDD.scala:168)

at org.apache.spark.InterruptibleIterator.hasNext(
InterruptibleIterator.scala:39)

at 

Access broadcast variable from within function passed to reduceByKey

2016-11-15 Thread coolgar
For example:

rows.reduceByKey(reduceKeyMapFunction)

reduceKeyMapFunction(log1: Map[String, Long], log2: Map[String, Long]):
Map[String,Long] = {
val bcast = broadcastv.value
val countFields = dbh.getCountFields
val aggs: Map[String, Long] = Map()
countFields.foreach { f => 
  val valueSum = aggLog1(f) + aggLog2(f)
  aggs ++ Map(f -> valueSum)
}
aggs
}

I can't pass broadcast to the reduceKeyMapFunction. I create the broadcast
variable (broadcastv) in the driver but I fear it will not be initialized on
the workers where the reduceKeyMapFunction runs. I've tried this but when
accessing broadcastv a NPE is thrown.

I can't pass it to the reduceKeyMapFunction because it can only accept two
params (log1, log2). 
Any ideas?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Access-broadcast-variable-from-within-function-passed-to-reduceByKey-tp28082.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: sbt shenanigans for a Spark-based project

2016-11-15 Thread Marco Mistroni
Uhm i removed mvn repo and ivy folder as well, sbt seems to kick in but for
some reason it cannot 'see' org.apacke.spark-mllib and therefore my
compilation fails
i have temporarily fixed it by placing the spark-mllib jar in my project
\lib directory,
perhaps i'll try to create a brand new Spark project and see if that makes
any differences

thanks for assistance Don!
kr
 marco

On Mon, Nov 14, 2016 at 11:13 PM, Don Drake  wrote:

> I would remove your entire local Maven repo (~/.m2/repo in linux) and try
> again. I'm able to compile sample code with your build.sbt and sbt
> v.0.13.12.
>
> -Don
>
> On Mon, Nov 14, 2016 at 3:11 PM, Marco Mistroni 
> wrote:
>
>> uhm.sorry.. still same issues. this is hte new version
>>
>> name := "SparkExamples"
>> version := "1.0"
>> scalaVersion := "2.11.8"
>> val sparkVersion = "2.0.1"
>>
>> // Add a single dependency
>> libraryDependencies += "junit" % "junit" % "4.8" % "test"
>> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
>> "org.slf4j" % "slf4j-simple" % "1.7.5",
>> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
>> libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
>> libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
>> sparkVersion
>> libraryDependencies += "org.apache.spark"%%"spark-mllib"   %
>> sparkVersion
>> libraryDependencies += "org.apache.spark"%%"spark-streaming-flume-sink"
>> % "2.0.1"
>> libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion
>>
>>
>> resolvers += "softprops-maven" at "http://dl.bintray.com/content
>> /softprops/maven"
>>
>> Still seeing these kinds of errors  which seems to lead to the fact that
>> somehow sbt is getting confused..
>>
>> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:2:
>> object mllib is not a member of package org.apache.spark
>> [error] import org.apache.spark.mllib.linalg.{ Vector, Vectors }
>> [error] ^
>> [error] 
>> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:3:
>> object mllib is not a member of package org.apache.spark
>> [error] import org.apache.spark.mllib.regression.LabeledPoint
>> [error] ^
>> [error] 
>> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:4:
>> object classification is not a member of package org.apache.spark.ml
>> [error] import org.apache.spark.ml.classification.{
>> RandomForestClassifier, RandomForestClassificationModel }
>> [error]^
>> [error] 
>> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:6:
>> object feature is not a member of package org.apache.spark.ml
>> [error] import org.apache.spark.ml.feature.{ StringIndexer,
>> IndexToString, VectorIndexer, VectorAssembler }
>> [error]^
>> [error] 
>> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:7:
>> object evaluation is not a member of package org.apache.spark.ml
>> [error] import org.apache.spark.ml.evaluation.{ RegressionEvaluator,
>> MulticlassClassificationEvaluator }
>> [error]^
>> [error] 
>> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:8:
>> object classification is not a member of package org.apache.spark.ml
>> [error] import org.apache.spark.ml.classification._
>> [error]^
>> [error] 
>> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:9:
>> object tuning is not a member of package org.apache.spark.ml
>> [error] import org.apache.spark.ml.tuning.{ CrossValidator,
>> ParamGridBuilder }
>> [error]^
>> [error] 
>> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:10:
>> object tuning is not a member of package org.apache.spark.ml
>> [error] import org.apache.spark.ml.tuning.{ ParamGridBuilder,
>> TrainValidationSplit }
>> [error]^
>> [error] 
>> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:16:
>> object Pipeline is not a member of package org.apache.spark.ml
>> [error] import org.apache.spark.ml.{ Pipeline, PipelineModel }
>>
>> any other hints?
>>
>> thanks and regarsd
>>  marco
>>
>>
>>
>>
>> On Sun, Nov 13, 2016 at 10:52 PM, Don Drake  wrote:
>>
>>> I would upgrade your Scala version to 2.11.8 as Spark 2.0 uses Scala
>>> 2.11 by default.
>>>
>>> On Sun, Nov 13, 2016 at 3:01 PM, Marco Mistroni 
>>> wrote:
>>>
 HI all
  i have a small Spark-based project which at the moment depends on jar
 from Spark 1.6.0
 The project has few Spark examples plus one which depends on Flume
 libraries


 I am attempting to move to Spark 2.0, but i am having issues with
 my dependencies
 The stetup below works fine when compiled against 1.6.0 dependencies

 

Problem submitting a spark job using yarn-client as master

2016-11-15 Thread David Robison
I am trying to submit a spark job through the yarn-client master setting. The 
job gets created and submitted to the clients but immediately errors out. Here 
is the relevant portion of the log:

15:39:37,385 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Requesting a new application from cluster with 1 NodeManagers
15:39:37,397 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Verifying our application has not requested more than the maximum memory 
capability of the cluster (4608 MB per container)
15:39:37,398 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) Will 
allocate AM container, with 896 MB memory including 384 MB overhead
15:39:37,399 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up container launch context for our AM
15:39:37,403 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up the launch environment for our AM container
15:39:37,427 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Preparing resources for our AM container
15:39:37,845 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/opt/wildfly/modules/org/apache/hadoop/client/main/spark-yarn_2.10-1.6.2.jar
15:39:38,050 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
15:39:38,102 INFO  [org.apache.spark.SecurityManager] (default task-1) Changing 
view acls to: wildfly,hdfs
15:39:38,105 INFO  [org.apache.spark.SecurityManager] (default task-1) Changing 
modify acls to: wildfly,hdfs
15:39:38,105 INFO  [org.apache.spark.SecurityManager] (default task-1) 
SecurityManager: authentication disabled; ui acls disabled; users with view 
permissions: Set(wildfly, hdfs); users with modify permissions: Set(wildfly, 
hdfs)
15:39:38,138 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Submitting application 5 to ResourceManager
15:39:38,256 INFO  [org.apache.hadoop.yarn.client.api.impl.YarnClientImpl] 
(default task-1) Submitted application application_1479240217825_0005
15:39:39,269 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:39,279 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1)
 client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1479242378159
final status: UNDEFINED
tracking URL: 
http://vb1.localdomain:8088/proxy/application_1479240217825_0005/
user: hdfs
15:39:40,285 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:41,290 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:42,295 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: FAILED)
15:39:42,295 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1)
 client token: N/A
diagnostics: Application application_1479240217825_0005 
failed 2 times due to AM Container for appattempt_1479240217825_0005_02 
exited with  exitCode: -1000
For more detailed output, check application tracking 
page:http://vb1.localdomain:8088/cluster/app/application_1479240217825_0005Then,
 click on links to logs of each attempt.
Diagnostics: File 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
 does not exist
java.io.FileNotFoundException: File 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
 does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)


Notice that the file __spark_conf__1435451360463636119.zip is not copied 
because it exists, I believe on the hdfs. However when the client goes to fetch 
it, it is reporting that it does not exist, probably because it is trying to 
get it from "file:/tmp" not the hdfs. Any idea how I can get this to work?
Thanks, David

David R Robison
Senior Systems Engineer
O. +1 512 247 3700
M. +1 757 286 0022
david.robi...@psgglobal.net

Re: Spark Streaming: question on sticky session across batches ?

2016-11-15 Thread Manish Malhotra
Thanks!
On Tue, Nov 15, 2016 at 1:07 AM Takeshi Yamamuro 
wrote:

> - dev
>
> Hi,
>
> AFAIK, if you use RDDs only, you can control the partition mapping to some
> extent
> by using a partition key RDD[(key, data)].
> A defined partitioner distributes data into partitions depending on the
> key.
> As a good example to control partitions, you can see the GraphX code;
>
> https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L291
>
> GraphX holds `PartitionId` in edge RDDs to control the partition where
> edge data are.
>
> // maropu
>
>
> On Tue, Nov 15, 2016 at 5:19 AM, Manish Malhotra <
> manish.malhotra.w...@gmail.com> wrote:
>
> sending again.
> any help is appreciated !
>
> thanks in advance.
>
> On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra <
> manish.malhotra.w...@gmail.com> wrote:
>
> Hello Spark Devs/Users,
>
> Im trying to solve the use case with Spark Streaming 1.6.2 where for every
> batch ( say 2 mins) data needs to go to the same reducer node after
> grouping by key.
> The underlying storage is Cassandra and not HDFS.
>
> This is a map-reduce job, where also trying to use the partitions of the
> Cassandra table to batch the data for the same partition.
>
> The requirement of sticky session/partition across batches is because the
> operations which we need to do, needs to read data for every key and then
> merge this with the current batch aggregate values. So, currently when
> there is no stickyness across batches, we have to read for every key, merge
> and then write back. and reads are very expensive. So, if we have sticky
> session, we can avoid read in every batch and have a cache of till last
> batch aggregates across batches.
>
> So, there are few options, can think of:
>
> 1. to change the TaskSchedulerImpl, as its using Random to identify the
> node for mapper/reducer before starting the batch/phase.
> Not sure if there is a custom scheduler way of achieving it?
>
> 2. Can custom RDD can help to find the node for the key-->node.
> there is a getPreferredLocation() method.
> But not sure, whether this will be persistent or can vary for some edge
> cases?
>
> Thanks in advance for you help and time !
>
> Regards,
> Manish
>
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Cannot find Native Library in "cluster" deploy-mode

2016-11-15 Thread jtgenesis
This link helped solved the issue for me!

http://permalink.gmane.org/gmane.comp.lang.scala.spark.user/22700 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-find-Native-Library-in-cluster-deploy-mode-tp28072p28081.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL UDF - passing map as a UDF parameter

2016-11-15 Thread Nirav Patel
Thanks. I tried following versions. They both compiles:

val colmap =  map(idxMap.flatMap(en => Iterator(lit(en._1),
lit(en._2))).toSeq: _*)
val colmap = map(idxMap.flatMap(x => x._1 :: x._2 :: Nil).toSeq.map(lit _):
_*)

However they fail on dataframe action like `show` with
org.apache.spark.SparkException:
Task not serializable error. I think it's an issue with interactive mode. I
am using zeppelin.

On Tue, Nov 15, 2016 at 12:38 AM, Takeshi Yamamuro 
wrote:

> Hi,
>
> Literal cannot handle Tuple2.
> Anyway, how about this?
>
> val rdd = sc.makeRDD(1 to 3).map(i => (i, 0))
> map(rdd.collect.flatMap(x => x._1 :: x._2 :: Nil).map(lit _): _*)
>
> // maropu
>
> On Tue, Nov 15, 2016 at 9:33 AM, Nirav Patel 
> wrote:
>
>> I am trying to use following API from Functions to convert a map into
>> column so I can pass it to UDF.
>>
>> map(cols: Column
>> 
>> *): Column
>> 
>>
>> "Creates a new map column. The input columns must be grouped as key-value
>> pairs, e.g. (key1, value1, key2, value2, ...). The key columns must all
>> have the same data type, and can't be null. The value columns must all have
>> the same data type."
>>
>>
>> final val idxMap = idxMapRdd.collectAsMap
>> val colmap =  map(idxMapA.map(lit _): _*)
>>
>> But getting following error:
>>
>> :139: error: type mismatch;
>>  found   : Iterable[org.apache.spark.sql.Column]
>>  required: Seq[org.apache.spark.sql.Column]
>>val colmap =  map(idxMapArr.map(lit _): _*)
>>
>>
>> If I try:
>> val colmap =  map(idxMapArr.map(lit _).toSeq: _*)
>>
>> It says:
>>
>> java.lang.RuntimeException: Unsupported literal type class scala.Tuple2
>> (17.0,MBO)
>> at org.apache.spark.sql.catalyst.expressions.Literal$.apply(lit
>> erals.scala:57)
>> at org.apache.spark.sql.functions$.lit(functions.scala:101)
>> at $anonfun$1.apply(:153)
>>
>>
>>
>> What is the correct usage of a `map` api to convert hashmap into column?
>>
>>
>>
>>
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>>   [image: LinkedIn]
>>   [image: Twitter]
>>   [image: Facebook]
>>   [image: YouTube]
>> 
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark ML DataFrame API - need cosine similarity, how to convert to RDD Vectors?

2016-11-15 Thread Asher Krim
What language are you using? For Java, you might convert the dataframe to
an rdd using something like this:

df
.toJavaRDD()
.map(row -> (SparseVector)row.getAs(row.fieldIndex("columnName")));

On Tue, Nov 15, 2016 at 1:06 PM, Russell Jurney 
wrote:

> I have two dataframes with common feature vectors and I need to get the
> cosine similarity of one against the other. It looks like this is possible
> in the RDD based API, mllib, but not in ml.
>
> So, how do I convert my sparse dataframe vectors into something spark
> mllib can use? I've searched, but haven't found anything.
>
> Thanks!
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>



-- 
Asher Krim
Senior Software Engineer


Log-loss for multiclass classification

2016-11-15 Thread janardhan shetty
Hi,

Best practice for multi class classification technique is to evaluate the
model by *log-loss*.
Is there any jira or work going on to implement the same in

*MulticlassClassificationEvaluator*

Currently it supports following :
(supports "f1" (default), "weightedPrecision", "weightedRecall", "accuracy")


Spark ML DataFrame API - need cosine similarity, how to convert to RDD Vectors?

2016-11-15 Thread Russell Jurney
I have two dataframes with common feature vectors and I need to get the
cosine similarity of one against the other. It looks like this is possible
in the RDD based API, mllib, but not in ml.

So, how do I convert my sparse dataframe vectors into something spark mllib
can use? I've searched, but haven't found anything.

Thanks!
-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


Re: CSV to parquet preserving partitioning

2016-11-15 Thread Daniel Siegmann
Did you try unioning the datasets for each CSV into a single dataset? You
may need to put the directory name into a column so you can partition by it.

On Tue, Nov 15, 2016 at 8:44 AM, benoitdr 
wrote:

> Hello,
>
> I'm trying to convert a bunch of csv files to parquet, with the interesting
> case that the input csv files are already "partitioned" by directory.
> All the input files have the same set of columns.
> The input files structure looks like :
>
> /path/dir1/file1.csv
> /path/dir1/file2.csv
> /path/dir2/file3.csv
> /path/dir3/file4.csv
> /path/dir3/file5.csv
> /path/dir3/file6.csv
>
> I'd like to read those files and write their data to a parquet table in
> hdfs, preserving the partitioning (partitioned by input directory), and
> such
> as there is a single output file per partition.
> The output files strucutre should look like :
>
> hdfs://path/dir=dir1/part-r-xxx.gz.parquet
> hdfs://path/dir=dir2/part-r-yyy.gz.parquet
> hdfs://path/dir=dir3/part-r-zzz.gz.parquet
>
>
> The best solution I have found so far is to loop among the input
> directories, loading the csv files in a dataframe and to write the
> dataframe
> in the target partition.
> But this not efficient since I want a single output file per partition, the
> writing to hdfs is a single tasks that blocks the loop.
> I wonder how to achieve this with a maximum of parallelism (and without
> shuffling the data in the cluster).
>
> Thanks !
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark R guidelines for non-spark functions and coxph (Cox Regression for Time-Dependent Covariates)

2016-11-15 Thread Shivaram Venkataraman
I think the answer to this depends on what granularity you want to run
the algorithm on. If its on the entire Spark DataFrame and if you
except the data frame to be very large then it isn't easy to use the
existing R function. However if you want to run the algorithm on
smaller subsets of the data you can look at the support for UDFs we
have in SparkR at
http://spark.apache.org/docs/latest/sparkr.html#applying-user-defined-function

Thanks
Shivaram

On Tue, Nov 15, 2016 at 3:56 AM, pietrop  wrote:
> Hi all,
> I'm writing here after some intensive usage on pyspark and SparkSQL.
> I would like to use a well known function in the R world: coxph() from the
> survival package.
> From what I understood, I can't parallelize a function like coxph() because
> it isn't provided with the SparkR package. In other words, I should
> implement a SparkR compatible algorithm instead of using coxph().
> I have no chance to make coxph() parallelizable, right?
> More generally, I think this is true for any non-spark function which only
> accept data.frame format as the data input.
>
> Do you plan to implement the coxph() counterpart in Spark? The most useful
> version of this model is the Cox Regression Model for Time-Dependent
> Covariates, which is missing from ANY ML framework as far as I know.
>
> Thank you
>  Pietro
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-R-guidelines-for-non-spark-functions-and-coxph-Cox-Regression-for-Time-Dependent-Covariates-tp28077.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Running stress tests on spark cluster to avoid wild-goose chase later

2016-11-15 Thread Dave Jaffe
Mich-

Sparkperf from Databricks (https://github.com/databricks/spark-perf) is a good 
stress test, covering a wide range of Spark functionality but especially ML. 
I’ve tested it with Spark 1.6.0 on CDH 5.7. It may need some work for Spark 2.0.

Dave Jaffe

BigData Performance
VMware
dja...@vmware.com

From: Mich Talebzadeh 
Date: Tuesday, November 15, 2016 at 11:09 AM
To: "user @spark" 
Subject: Running stress tests on spark cluster to avoid wild-goose chase later

Hi,

This is rather a broad question.

We would like to run a set of stress tests against our Spark clusters to ensure 
that the build performs as expected before deploying the cluster.

Reasoning behind this is that the users were reporting some ML jobs running on 
two equal clusters reporting back different times, one cluster was behaving 
much worse than other using the same workload.

This was eventually traced to wrong BIOS setting at hardware level and did not 
have anything to do with Spark itself.

So rather spending a good while doing wild-goose chase, we would like to take 
spark app through some tests cycles.

We have some ideas but appreciate some other feedbacks.

The current version is CHDS 5.2.

Thanks

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




Spark SQL and JDBC compatibility

2016-11-15 Thread herman...@teeupdata.com
Hi Everyone,

while reading data into spark 2.0.0 data frames through Calcite JDBC driver, 
depends on Calcite JDBC connection property setup (lexical), sometimes the data 
frame query returns empty result set, sometimes it errors out with exception: 
java.sql.SQLException: Error while preparing statement…

One of the scenarios I realized is that , with df.filter($”col”=“value”),  
Spark generated sql has “where (col IS NOT NULL) and (col='value’), which fails 
Calcite sql parser. if (col IS NOT NULL) is removed, query went through fine.

So, has anybody encountered similar sql code compatibility issue, especially 
with Calcite? Is it possible to have some configuration changes, on both Spark 
and Calcite sides to make it working together?

Thanks
Herman.



GraphX updating vertex property

2016-11-15 Thread Saliya Ekanayake
Hi,

I have created a property graph using GraphX. Each vertex has an integer
array as a property. I'd like to update the values of theses arrays without
creating new graph objects.

Is this possible in Spark?

Thank you,
Saliya

-- 
Saliya Ekanayake, Ph.D
Applied Computer Scientist
Network Dynamics and Simulation Science Laboratory (NDSSL)
Virginia Tech, Blacksburg


Running stress tests on spark cluster to avoid wild-goose chase later

2016-11-15 Thread Mich Talebzadeh
Hi,

This is rather a broad question.

We would like to run a set of stress tests against our Spark clusters to
ensure that the build performs as expected before deploying the cluster.

Reasoning behind this is that the users were reporting some ML jobs running
on two equal clusters reporting back different times, one cluster was
behaving much worse than other using the same workload.

This was eventually traced to wrong BIOS setting at hardware level and did
not have anything to do with Spark itself.

So rather spending a good while doing wild-goose chase, we would like to
take spark app through some tests cycles.

We have some ideas but appreciate some other feedbacks.

The current version is CHDS 5.2.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Cannot find Native Library in "cluster" deploy-mode

2016-11-15 Thread jtgenesis
thanks for the feedback.

I tried your suggestion using the --files option, but still no luck. I've
also checked and made sure that the libraries the .so files need are also
there. When I look at the SparkUI/Environment tab, the
`spark.executorEnv.LD_LIBRARY_PATH` is pointing to the correct libraries.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-find-Native-Library-in-cluster-deploy-mode-tp28072p28080.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Finding a Spark Equivalent for Pandas' get_dummies

2016-11-15 Thread neil90
You can have a list of all the columns and pass it to a recursive recursive
function to fit and make the transformation.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Finding-a-Spark-Equivalent-for-Pandas-get-dummies-tp28064p28079.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



SQL analyzer breakdown

2016-11-15 Thread Koert Kuipers
We see the analyzer break down almost guaranteed when programs get to a
certain size or complexity. It starts complaining with messages along the
lines of "cannot find column x#255 in list of columns that includes x#255".
The workaround is to go to rdd and back. Is there a way to achieve the same
(force the analyzer to break it up in chunks that are done independently)
without the roundtrip to rdd and back? Like a checkpoint for the analyzer
so to speak.


Re: Joining to a large, pre-sorted file

2016-11-15 Thread Stuart White
It seems that the number of files could possibly get out of hand using this
approach.

For example, in the job that buckets and writes master, assuming we use the
default number of shuffle partitions (200), and assuming that in the next
job (the job where we join to transaction), we're also going to want to use
200 partitions, that means master would be written to disk in 40,000 files
(200 partitions, each writing 200 bucket files).  Am I mistaken?

Is there some way to avoid this explosion of the number of files?  Or is
this just an unavoidable side-effect of Spark's bucketing implementation?

Thanks again!

On Sun, Nov 13, 2016 at 9:24 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Hi Stuart,
>
> Yes that's the query plan but if you take a look at my screenshot it skips
> the first stage since the datasets are co-partitioned.
>
> Thanks,
> Silvio
> --
> *From:* Stuart White 
> *Sent:* Saturday, November 12, 2016 11:20:28 AM
> *To:* Silvio Fiorito
> *Cc:* user@spark.apache.org
> *Subject:* Re: Joining to a large, pre-sorted file
>
> Hi Silvio,
>
> Thanks very much for the response!
>
> I'm pretty new at reading explain plans, so maybe I'm misunderstanding
> what I'm seeing.
>
> Remember my goal is to sort master, write it out, later read it back in
> and have Spark "remember" that it's sorted, so I can do joins and Spark
> will not sort it again.
>
> Looking at the explain plan for the example job you provided, it looks to
> me like Spark is re-sorted master after reading it back in.  See the
> attachment for the Sort step I'm referring to.
>
> Am I misunderstanding the explain plan?
>
> Thanks again!
>
> On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>> Hi Stuart,
>>
>>
>>
>> You don’t need the sortBy or sortWithinPartitions.
>>
>>
>>
>> https://databricks-prod-cloudfront.cloud.databricks.com/
>> public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/8799
>> 01972425732/6861830365114179/latest.html
>>
>>
>>
>>
>>
>> This is what the job should look like:
>>
>>
>>
>> On 11/12/16, 8:40 AM, "Stuart White"  wrote:
>>
>>
>>
>> Thanks for the reply.
>>
>>
>>
>> I understand that I need to use bucketBy() to write my master file,
>>
>> but I still can't seem to make it work as expected.  Here's a code
>>
>> example for how I'm writing my master file:
>>
>>
>>
>> Range(0, 100)
>>
>>   .map(i => (i, s"master_$i"))
>>
>>   .toDF("key", "value")
>>
>>   .write
>>
>>   .format("json")
>>
>>   .bucketBy(3, "key")
>>
>>   .sortBy("key")
>>
>>   .saveAsTable("master")
>>
>>
>>
>> And here's how I'm reading it later and attempting to join to a
>>
>> transaction dataset:
>>
>>
>>
>> val master = spark
>>
>>   .read
>>
>>   .format("json")
>>
>>   .json("spark-warehouse/master")
>>
>>   .cache
>>
>>
>>
>> val transaction = Range(0, 100)
>>
>>   .map(i => (i, s"transaction_$i"))
>>
>>   .toDF("key", "value")
>>
>>   .repartition(3, 'key)
>>
>>   .sortWithinPartitions('key)
>>
>>   .cache
>>
>>
>>
>> val results = master.join(transaction, "key")
>>
>>
>>
>> When I call results.explain(), I see that it is sorting both datasets
>>
>> before sending them through SortMergeJoin.
>>
>>
>>
>> == Physical Plan ==
>>
>> *Project [key#0L, value#1, value#53]
>>
>> +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner
>>
>>   :- *Sort [key#0L ASC], false, 0
>>
>>:  +- Exchange hashpartitioning(key#0L, 200)
>>
>>: +- *Filter isnotnull(key#0L)
>>
>>:+- InMemoryTableScan [key#0L, value#1],
>> [isnotnull(key#0L)]
>>
>>:   :  +- InMemoryRelation [key#0L, value#1], true, 1,
>>
>> StorageLevel(disk, memory, deserialized, 1 replicas)
>>
>>:   : :  +- *Scan json [key#0L,value#1] Format: JSON,
>>
>> InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],
>>
>> PushedFilters: [], ReadSchema: struct
>>
>>+- *Sort [cast(key#52 as bigint) ASC], false, 0
>>
>>   +- Exchange hashpartitioning(cast(key#52 as bigint), 200)
>>
>>  +- InMemoryTableScan [key#52, value#53]
>>
>> :  +- InMemoryRelation [key#52, value#53], true, 1,
>>
>> StorageLevel(disk, memory, deserialized, 1 replicas)
>>
>> : :  +- *Sort [key#52 ASC], false, 0
>>
>> : : +- Exchange hashpartitioning(key#52, 3)
>>
>> : :+- LocalTableScan [key#52, value#53]
>>
>>
>>
>> Here are my thoughts:
>>
>> 1. I think I'm probably reading the master file back into memory
>>
>> incorrectly.  I think maybe I should be reading it as a Hive table
>>
>> rather than just a plain json file, but I can't seem to figure out how
>>
>> to do that.
>>
>> 2. I don't 

creating a javaRDD using newAPIHadoopFile and FixedLengthInputFormat

2016-11-15 Thread David Robison
I am trying to create a Spark javaRDD using the newAPIHadoopFile and the 
FixedLengthInputFormat. Here is my code snippit,

Configuration config = new Configuration();
config.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, JPEG_INDEX_SIZE);
config.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
String fileFilter = config.get("fs.defaultFS") + "/A/B/C/*.idx";
JavaPairRDD inputRDD = 
sparkContext.newAPIHadoopFile(fileFilter, FixedLengthInputFormat.class, 
LongWritable.class, BytesWritable.class, config);

At this point I get the following exception:

Error executing mapreduce job: 
com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion 
(StackOverflowError)

Any idea what I am doing wrong? I am new to Spark. David

David R Robison
Senior Systems Engineer
O. +1 512 247 3700
M. +1 757 286 0022
david.robi...@psgglobal.net
www.psgglobal.net

Prometheus Security Group Global, Inc.
3019 Alvin Devane Boulevard
Building 4, Suite 450
Austin, TX 78741




CSV to parquet preserving partitioning

2016-11-15 Thread benoitdr
Hello,

I'm trying to convert a bunch of csv files to parquet, with the interesting
case that the input csv files are already "partitioned" by directory.
All the input files have the same set of columns.
The input files structure looks like :

/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv

I'd like to read those files and write their data to a parquet table in
hdfs, preserving the partitioning (partitioned by input directory), and such
as there is a single output file per partition.
The output files strucutre should look like :

hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet


The best solution I have found so far is to loop among the input
directories, loading the csv files in a dataframe and to write the dataframe
in the target partition.
But this not efficient since I want a single output file per partition, the
writing to hdfs is a single tasks that blocks the loop.
I wonder how to achieve this with a maximum of parallelism (and without
shuffling the data in the cluster).

Thanks !



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Exclude certain data from Training Data - Mlib

2016-11-15 Thread Bhaarat Sharma
I have my data in two colors and excluded_colors.

colors contains all colors excluded_colors contains some colors that I wish
to exclude from my trainingset.

I am trying to split the data into a training and testing set and ensure
that the colors in excluded_colors are not in my training set but exist in
the testing set.
In order to achieve the above, I did this

var colors = spark.sql("""
   select colors.*
   from colors
   LEFT JOIN excluded_colors
   ON excluded_colors.color_id = colors.color_id
   where excluded_colors.color_id IS NULL
""")val trainer: (Int => Int) = (arg:Int) => 0val sqlTrainer =
udf(trainer)val tester: (Int => Int) = (arg:Int) => 1val sqlTester =
udf(tester)
val rsplit = colors.randomSplit(Array(0.7, 0.3))  val train_colors =
splits(0).select("color_id").withColumn("test",sqlTrainer(col("color_id")))val
test_colors = 
splits(1).select("color_id").withColumn("test",sqlTester(col("color_id")))


However, I'm realizing that by doing the above the colors in
excluded_colors are
completely ignored. They are not even in my testing set.

How can I split the data in 70/30 while also ensuring that the colors in
excluded_colorsare not in training but are present in testing.


Re: scala.MatchError while doing BinaryClassificationMetrics

2016-11-15 Thread Bhaarat Sharma
Thank, Nick.

This worked for me.

val evaluator = new BinaryClassificationEvaluator().
setLabelCol("label").
setRawPredictionCol("ModelProbability").
setMetricName("areaUnderROC")

val auROC = evaluator.evaluate(testResults)

On Mon, Nov 14, 2016 at 4:00 PM, Nick Pentreath 
wrote:

> Typically you pass in the result of a model transform to the evaluator.
>
> So:
> val model = estimator.fit(data)
> val auc = evaluator.evaluate(model.transform(testData)
>
> Check Scala API docs for some details: http://spark.apache.
> org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.
> BinaryClassificationEvaluator
>
> On Mon, 14 Nov 2016 at 20:02 Bhaarat Sharma  wrote:
>
> Can you please suggest how I can use BinaryClassificationEvaluator? I
> tried:
>
> scala> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
>
> scala>  val evaluator = new BinaryClassificationEvaluator()
> evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator =
> binEval_0d57372b7579
>
> Try 1:
>
> scala> evaluator.evaluate(testScoreAndLabel.rdd)
> :105: error: type mismatch;
>  found   : org.apache.spark.rdd.RDD[(Double, Double)]
>  required: org.apache.spark.sql.Dataset[_]
>evaluator.evaluate(testScoreAndLabel.rdd)
>
> Try 2:
>
> scala> evaluator.evaluate(testScoreAndLabel)
> java.lang.IllegalArgumentException: Field "rawPrediction" does not exist.
>   at org.apache.spark.sql.types.StructType$$anonfun$apply$1.
> apply(StructType.scala:228)
>
> Try 3:
>
> scala> evaluator.evaluate(testScoreAndLabel.select("
> Label","ModelProbability"))
> org.apache.spark.sql.AnalysisException: cannot resolve '`Label`' given
> input columns: [_1, _2];
>   at org.apache.spark.sql.catalyst.analysis.package$
> AnalysisErrorAt.failAnalysis(package.scala:42)
>
>
> On Mon, Nov 14, 2016 at 1:44 PM, Nick Pentreath 
> wrote:
>
> DataFrame.rdd returns an RDD[Row]. You'll need to use map to extract the
> doubles from the test score and label DF.
>
> But you may prefer to just use spark.ml evaluators, which work with
> DataFrames. Try BinaryClassificationEvaluator.
>
> On Mon, 14 Nov 2016 at 19:30, Bhaarat Sharma  wrote:
>
> I am getting scala.MatchError in the code below. I'm not able to see why
> this would be happening. I am using Spark 2.0.1
>
> scala> testResults.columns
> res538: Array[String] = Array(TopicVector, subject_id, hadm_id, isElective, 
> isNewborn, isUrgent, isEmergency, isMale, isFemale, oasis_score, 
> sapsii_score, sofa_score, age, hosp_death, test, ModelFeatures, Label, 
> rawPrediction, ModelProbability, ModelPrediction)
>
> scala> testResults.select("Label","ModelProbability").take(1)
> res542: Array[org.apache.spark.sql.Row] = 
> Array([0.0,[0.737304818744076,0.262695181255924]])
>
> scala> val testScoreAndLabel = testResults.
>  | select("Label","ModelProbability").
>  | map { case Row(l:Double, p:Vector) => (p(1), l) }
> testScoreAndLabel: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: 
> double, _2: double]
>
> scala> testScoreAndLabel
> res539: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: 
> double]
>
> scala> testScoreAndLabel.columns
> res540: Array[String] = Array(_1, _2)
>
> scala> val testMetrics = new 
> BinaryClassificationMetrics(testScoreAndLabel.rdd)
> testMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = 
> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@36e780d1
>
> The code below gives the error
>
> val auROC = testMetrics.areaUnderROC() //this line gives the error
>
> Caused by: scala.MatchError: [0.0,[0.7316583497453766,0.2683416502546234]] 
> (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
>
>
>


Spark R guidelines for non-spark functions and coxph (Cox Regression for Time-Dependent Covariates)

2016-11-15 Thread pietrop
Hi all,
I'm writing here after some intensive usage on pyspark and SparkSQL.
I would like to use a well known function in the R world: coxph() from the
survival package.
>From what I understood, I can't parallelize a function like coxph() because
it isn't provided with the SparkR package. In other words, I should
implement a SparkR compatible algorithm instead of using coxph().
I have no chance to make coxph() parallelizable, right?
More generally, I think this is true for any non-spark function which only
accept data.frame format as the data input. 

Do you plan to implement the coxph() counterpart in Spark? The most useful
version of this model is the Cox Regression Model for Time-Dependent
Covariates, which is missing from ANY ML framework as far as I know.

Thank you
 Pietro



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-R-guidelines-for-non-spark-functions-and-coxph-Cox-Regression-for-Time-Dependent-Covariates-tp28077.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Straming - Stop when there's no more data

2016-11-15 Thread Ashic Mahtab
I'm using Spark Streaming to process a large number of files (10s of millions) 
from a single directory in S3. Using sparkContext.textFile or wholeTextFile 
takes ages and doesn't do anything. Pointing Structured Streaming to that 
location seems to work, but  after processing all the input, it waits for more. 
Is there a way to terminate the Streaming app once all input has been exhausted?


Fwd:

2016-11-15 Thread Anton Okolnychyi
Hi,

I have experienced a problem using the Datasets API in Spark 1.6, while
almost identical code works fine in Spark 2.0.
The problem is related to encoders and custom aggregators.

*Spark 1.6 (the aggregation produces an empty map):*

  implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
  // implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
org.apache.spark.sql.Encoders.kryo[Map[Int, String]]

  val sparkConf = new SparkConf()
.setAppName("IVU DS Spark 1.6 Test")
.setMaster("local[4]")
  val sparkContext = new SparkContext(sparkConf)
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33",
2, "id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int,
String], Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
  map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
  map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
.groupBy(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap

In spark.sql.execution.TypedAggregateExpression.scala, I see that the
reduce is done correctly, but Spark cannot read the reduced values in the
merge phase.
If I replace the ExperessionEncoder with Kryo-based one (commented in the
presented code), then it works fine.

*Spark 2.0 (works correctly):*

  val spark = SparkSession
.builder()
.appName("IVU DS Spark 2.0 Test")
.config("spark.sql.warehouse.dir", "file:///D://sparkSql")
.master("local[4]")
.getOrCreate()

  import spark.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33",
2, "id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int,
String], Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
  map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
  map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
override def bufferEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
override def outputEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
  }.toColumn

  val resultMap = stopPointDS
.groupByKey(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
I know that Spark 1.6 has only a preview of the Datasets concept and a lot
changed in 2.0. However, I would like to know if I am doing anything wrong
in my 1.6 code.

Thanks in advance,
Anton


Simple "state machine" functionality using Scala or Python

2016-11-15 Thread Esa Heikkinen

Hello

Can anyone provide a simple example how to implement a "state machine" 
functionality using Scala or Python in Spark?


Sequence of the state machine would be like this:
1) Searches first event of log and its data
2) Based on the data of the first event, searches second event of log 
and its data

3) And so on..

Regards
Esa Heikkinen

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming: question on sticky session across batches ?

2016-11-15 Thread Takeshi Yamamuro
- dev

Hi,

AFAIK, if you use RDDs only, you can control the partition mapping to some
extent
by using a partition key RDD[(key, data)].
A defined partitioner distributes data into partitions depending on the key.
As a good example to control partitions, you can see the GraphX code;
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L291

GraphX holds `PartitionId` in edge RDDs to control the partition where edge
data are.

// maropu


On Tue, Nov 15, 2016 at 5:19 AM, Manish Malhotra <
manish.malhotra.w...@gmail.com> wrote:

> sending again.
> any help is appreciated !
>
> thanks in advance.
>
> On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra <
> manish.malhotra.w...@gmail.com> wrote:
>
>> Hello Spark Devs/Users,
>>
>> Im trying to solve the use case with Spark Streaming 1.6.2 where for
>> every batch ( say 2 mins) data needs to go to the same reducer node after
>> grouping by key.
>> The underlying storage is Cassandra and not HDFS.
>>
>> This is a map-reduce job, where also trying to use the partitions of the
>> Cassandra table to batch the data for the same partition.
>>
>> The requirement of sticky session/partition across batches is because the
>> operations which we need to do, needs to read data for every key and then
>> merge this with the current batch aggregate values. So, currently when
>> there is no stickyness across batches, we have to read for every key, merge
>> and then write back. and reads are very expensive. So, if we have sticky
>> session, we can avoid read in every batch and have a cache of till last
>> batch aggregates across batches.
>>
>> So, there are few options, can think of:
>>
>> 1. to change the TaskSchedulerImpl, as its using Random to identify the
>> node for mapper/reducer before starting the batch/phase.
>> Not sure if there is a custom scheduler way of achieving it?
>>
>> 2. Can custom RDD can help to find the node for the key-->node.
>> there is a getPreferredLocation() method.
>> But not sure, whether this will be persistent or can vary for some edge
>> cases?
>>
>> Thanks in advance for you help and time !
>>
>> Regards,
>> Manish
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Spark SQL UDF - passing map as a UDF parameter

2016-11-15 Thread Takeshi Yamamuro
Hi,

Literal cannot handle Tuple2.
Anyway, how about this?

val rdd = sc.makeRDD(1 to 3).map(i => (i, 0))
map(rdd.collect.flatMap(x => x._1 :: x._2 :: Nil).map(lit _): _*)

// maropu

On Tue, Nov 15, 2016 at 9:33 AM, Nirav Patel  wrote:

> I am trying to use following API from Functions to convert a map into
> column so I can pass it to UDF.
>
> map(cols: Column
> 
> *): Column
> 
>
> "Creates a new map column. The input columns must be grouped as key-value
> pairs, e.g. (key1, value1, key2, value2, ...). The key columns must all
> have the same data type, and can't be null. The value columns must all have
> the same data type."
>
>
> final val idxMap = idxMapRdd.collectAsMap
> val colmap =  map(idxMapA.map(lit _): _*)
>
> But getting following error:
>
> :139: error: type mismatch;
>  found   : Iterable[org.apache.spark.sql.Column]
>  required: Seq[org.apache.spark.sql.Column]
>val colmap =  map(idxMapArr.map(lit _): _*)
>
>
> If I try:
> val colmap =  map(idxMapArr.map(lit _).toSeq: _*)
>
> It says:
>
> java.lang.RuntimeException: Unsupported literal type class scala.Tuple2
> (17.0,MBO)
> at org.apache.spark.sql.catalyst.expressions.Literal$.apply(
> literals.scala:57)
> at org.apache.spark.sql.functions$.lit(functions.scala:101)
> at $anonfun$1.apply(:153)
>
>
>
> What is the correct usage of a `map` api to convert hashmap into column?
>
>
>
>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 




-- 
---
Takeshi Yamamuro


Re: How to read a Multi Line json object via Spark

2016-11-15 Thread Hyukjin Kwon
Hi Sree,


There is a blog about that,
http://searchdatascience.com/spark-adventures-1-processing-multi-line-json-files/

It is pretty old but I am sure that it is helpful.

Currently, JSON datasource only supports to rest JSON documents formatted
according to http://jsonlines.org/

There is an issue open to support this
https://issues.apache.org/jira/browse/SPARK-18352

I hope this is helpful.


Thanks.



2016-11-15 16:20 GMT+09:00 Sree Eedupuganti :

> I tried from Spark-Shell and i am getting the following error:
>
> Here is the test.json file:
>
> {
> "colorsArray": [{
> "red": "#f00",
> "green": "#0f0",
> "blue": "#00f",
> "cyan": "#0ff",
> "magenta": "#f0f",
> "yellow": "#ff0",
> "black": "#000"
> }]}
>
>
> scala> val jtex = 
> sqlContext.read.format("json").option("samplingRatio","1.0").load("/user/spark/test.json")
>
>jtex: org.apache.spark.sql.DataFrame = [_corrupt_record: string]
>
>
> Any suggestions please. Thanks.
> --
> Best Regards,
> Sreeharsha Eedupuganti
> Data Engineer
> innData Analytics Private Limited
>