Re: Joining a RDD to a Dataframe

2016-05-08 Thread Cyril Scetbon
Hi Ashish,

The issue is not related to converting a RDD to a DF. I did it. I was just 
asking if I should do it differently.

The issue regards the exception when using array_contains with a sql.Column 
instead of a value.

I found another way to do it using explode as follows : 

df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input, 
$"aid" === df_input("id")).select(df("id"))

However, I'm wondering if it does almost the same or if the query is different 
and worst in term of performance.

If someone can comment on it and maybe give me advices.

Thank you.

> On May 8, 2016, at 22:12, Ashish Dubey  wrote:
> 
> Is there any reason you dont want to convert this - i dont think join b/w RDD 
> and DF is supported.
> 
> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon  > wrote:
> Hi,
> 
> I have a RDD built during a spark streaming job and I'd like to join it to a 
> DataFrame (E/S input) to enrich it.
> It seems that I can't join the RDD and the DF without converting first the 
> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
> 
> scala> df
> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: 
> array>, id: string]
> 
> scala> df_input
> res33: org.apache.spark.sql.DataFrame = [id: string]
> 
> scala> df_input.collect
> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
> 
> I can get ids I want if I know the value to look for in addresses.id 
>  using :
> 
> scala> df.filter(array_contains(df("addresses.id "), 
> "idaddress2")).select("id").collect
> res35: Array[org.apache.spark.sql.Row] = Array([], [YY])
> 
> However when I try to join df_input and df and to use the previous filter as 
> the join condition I get an exception :
> 
> scala> df.join(df_input, array_contains(df("adresses.id 
> "), df_input("id")))
> java.lang.RuntimeException: Unsupported literal type class 
> org.apache.spark.sql.Column id
> at 
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
> at 
> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
> ...
> 
> It seems that array_contains only supports static arguments and does not 
> replace a sql.Column by its value.
> 
> What's the best way to achieve what I want to do ? (Also speaking in term of 
> performance)
> 
> Thanks
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



How big the spark stream window could be ?

2016-05-08 Thread kramer2...@126.com
We have some stream data need to be calculated and considering use spark
stream to do it.

We need to generate three kinds of reports. The reports are based on 

1. The last 5 minutes data
2. The last 1 hour data
3. The last 24 hour data

The frequency of reports is 5 minutes. 

After reading the docs, the most obvious way to solve this seems to set up a
spark stream with 5 minutes interval and two window which are 1 hour and 1
day.


But I am worrying that if the window is too big for one day and one hour. I
do not have much experience on spark stream, so what is the window length in
your environment? 

Any official docs talking about this?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-big-the-spark-stream-window-could-be-tp26899.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



partitioner aware subtract

2016-05-08 Thread Raghava Mutharaju
Hello All,

We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key (number
of partitions are same for both the RDDs). We would like to subtract rdd2
from rdd1.

The subtract code at
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
seems to group the elements of both the RDDs using (x, null) where x is the
element of the RDD and partition them. Then it makes use of
subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
case, is both key and value combined). In our case, both the RDDs are
already hash partitioned on the key of x. Can we take advantage of this by
having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
mapPartitions() for this?

We tried to broadcast rdd2 and use mapPartitions. But this turns out to be
memory consuming and inefficient. We tried to do a local set difference
between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
use destroy() on the broadcasted value, but it does not help.

The current subtract method is slow for us. rdd1 and rdd2 are around 700MB
each and the subtract takes around 14 seconds.

Any ideas on this issue is highly appreciated.

Regards,
Raghava.


Re: Joining a RDD to a Dataframe

2016-05-08 Thread Ashish Dubey
Is there any reason you dont want to convert this - i dont think join b/w
RDD and DF is supported.

On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon 
wrote:

> Hi,
>
> I have a RDD built during a spark streaming job and I'd like to join it to
> a DataFrame (E/S input) to enrich it.
> It seems that I can't join the RDD and the DF without converting first the
> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>
> scala> df
> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
> array>, id: string]
>
> scala> df_input
> res33: org.apache.spark.sql.DataFrame = [id: string]
>
> scala> df_input.collect
> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
>
> I can get ids I want if I know the value to look for in addresses.id
> using :
>
> scala> df.filter(array_contains(df("addresses.id"),
> "idaddress2")).select("id").collect
> res35: Array[org.apache.spark.sql.Row] = Array([], [YY])
>
> However when I try to join df_input and df and to use the previous filter
> as the join condition I get an exception :
>
> scala> df.join(df_input, array_contains(df("adresses.id"),
> df_input("id")))
> java.lang.RuntimeException: Unsupported literal type class
> org.apache.spark.sql.Column id
> at
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
> at
> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
> ...
>
> It seems that array_contains only supports static arguments and does not
> replace a sql.Column by its value.
>
> What's the best way to achieve what I want to do ? (Also speaking in term
> of performance)
>
> Thanks
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: sqlCtx.read.parquet yields lots of small tasks

2016-05-08 Thread Ashish Dubey
I see the behavior - so it always goes with min total tasks possible on
your settings ( num-executors * num-cores ) - however if you use a huge
amount of data then you will see more tasks - that means it has some kind
of lower bound on num-tasks.. It may require some digging. other formats
did not seem to have this issue.

On Sun, May 8, 2016 at 12:10 AM, Johnny W.  wrote:

> The file size is very small (< 1M). The stage launches every time i call:
> --
> sqlContext.read.parquet(path_to_file)
>
> These are the parquet specific configurations I set:
> --
> spark.sql.parquet.filterPushdown: true
> spark.sql.parquet.mergeSchema: true
>
> Thanks,
> J.
>
> On Sat, May 7, 2016 at 4:20 PM, Ashish Dubey  wrote:
>
>> How big is your file and can you also share the code snippet
>>
>>
>> On Saturday, May 7, 2016, Johnny W.  wrote:
>>
>>> hi spark-user,
>>>
>>> I am using Spark 1.6.0. When I call sqlCtx.read.parquet to create a
>>> dataframe from a parquet data source with a single parquet file, it yields
>>> a stage with lots of small tasks. It seems the number of tasks depends on
>>> how many executors I have instead of how many parquet files/partitions I
>>> have. Actually, it launches 5 tasks on each executor.
>>>
>>> This behavior is quite strange, and may have potential issue if there is
>>> a slow executor. What is this "parquet" stage for? and why it launches 5
>>> tasks on each executor?
>>>
>>> Thanks,
>>> J.
>>>
>>
>


Re: BlockManager crashing applications

2016-05-08 Thread Ashish Dubey
   1. Caused by: java.io.IOException: Failed to connect to
   ip-10-12-46-235.us-west-2.compute.internal/10.12.46.235:55681
   2. at
   
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
   3. at
   
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
   4. at
   
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
   5. at
   
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
   6. at
   
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
   7. at
   
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)


Above message indicates that there used to be a executor on that address
and by the time other executor was about to read - it did not exist. You
may also be able to confirm ( if this is the case )by looking at spark App
ui - you may find dead executors..

On Sun, May 8, 2016 at 6:02 PM, Brandon White 
wrote:

> I'm not quite sure how this is a memory problem. There are no OOM
> exceptions and the job only breaks when actions are ran in parallel,
> submitted to the scheduler by different threads.
>
> The issue is that the doGetRemote function does not retry when it is
> denied access to a cache block.
> On May 8, 2016 5:55 PM, "Ashish Dubey"  wrote:
>
> Brandon,
>
> how much memory are you giving to your executors - did you check if there
> were dead executors in your application logs.. Most likely you require
> higher memory for executors..
>
> Ashish
>
> On Sun, May 8, 2016 at 1:01 PM, Brandon White 
> wrote:
>
>> Hello all,
>>
>> I am running a Spark application which schedules multiple Spark jobs.
>> Something like:
>>
>> val df  = sqlContext.read.parquet("/path/to/file")
>>
>> filterExpressions.par.foreach { expression =>
>>   df.filter(expression).count()
>> }
>>
>> When the block manager fails to fetch a block, it throws an exception
>> which eventually kills the exception: http://pastebin.com/2ggwv68P
>>
>> This code works when I run it on one thread with:
>>
>> filterExpressions.foreach { expression =>
>>   df.filter(expression).count()
>> }
>>
>> But I really need the parallel execution of the jobs. Is there anyway
>> around this? It seems like a bug in the BlockManagers doGetRemote function.
>> I have tried the HTTP Block Manager as well.
>>
>
>


Re: BlockManager crashing applications

2016-05-08 Thread Brandon White
I'm not quite sure how this is a memory problem. There are no OOM
exceptions and the job only breaks when actions are ran in parallel,
submitted to the scheduler by different threads.

The issue is that the doGetRemote function does not retry when it is denied
access to a cache block.
On May 8, 2016 5:55 PM, "Ashish Dubey"  wrote:

Brandon,

how much memory are you giving to your executors - did you check if there
were dead executors in your application logs.. Most likely you require
higher memory for executors..

Ashish

On Sun, May 8, 2016 at 1:01 PM, Brandon White 
wrote:

> Hello all,
>
> I am running a Spark application which schedules multiple Spark jobs.
> Something like:
>
> val df  = sqlContext.read.parquet("/path/to/file")
>
> filterExpressions.par.foreach { expression =>
>   df.filter(expression).count()
> }
>
> When the block manager fails to fetch a block, it throws an exception
> which eventually kills the exception: http://pastebin.com/2ggwv68P
>
> This code works when I run it on one thread with:
>
> filterExpressions.foreach { expression =>
>   df.filter(expression).count()
> }
>
> But I really need the parallel execution of the jobs. Is there anyway
> around this? It seems like a bug in the BlockManagers doGetRemote function.
> I have tried the HTTP Block Manager as well.
>


Re: BlockManager crashing applications

2016-05-08 Thread Ashish Dubey
Brandon,

how much memory are you giving to your executors - did you check if there
were dead executors in your application logs.. Most likely you require
higher memory for executors..

Ashish

On Sun, May 8, 2016 at 1:01 PM, Brandon White 
wrote:

> Hello all,
>
> I am running a Spark application which schedules multiple Spark jobs.
> Something like:
>
> val df  = sqlContext.read.parquet("/path/to/file")
>
> filterExpressions.par.foreach { expression =>
>   df.filter(expression).count()
> }
>
> When the block manager fails to fetch a block, it throws an exception
> which eventually kills the exception: http://pastebin.com/2ggwv68P
>
> This code works when I run it on one thread with:
>
> filterExpressions.foreach { expression =>
>   df.filter(expression).count()
> }
>
> But I really need the parallel execution of the jobs. Is there anyway
> around this? It seems like a bug in the BlockManagers doGetRemote function.
> I have tried the HTTP Block Manager as well.
>


Re: Parse Json in Spark

2016-05-08 Thread Ashish Dubey
This limit is due to underlying inputFormat implementation.  you can always
write your own inputFormat and then use spark newAPIHadoopFile api to pass
your inputFormat class path. You will have to place the jar file in /lib
location on all the nodes..

Ashish

On Sun, May 8, 2016 at 4:02 PM, Hyukjin Kwon  wrote:

>
> I remember this Jira, https://issues.apache.org/jira/browse/SPARK-7366.
> Parsing multiple lines are not supported in Json fsta source.
>
> Instead this can be done by sc.wholeTextFiles(). I found some examples
> here,
> http://searchdatascience.com/spark-adventures-1-processing-multi-line-json-files
>
> Although this reads a file as a whole record, this should work.
>
> Thanks!
> On 9 May 2016 7:20 a.m., "KhajaAsmath Mohammed" 
> wrote:
>
>> Hi,
>>
>> I am working on parsing the json in spark but most of the information
>> available online states that  I need to have entire JSON in single line.
>>
>> In my case, Json file is delivered in complex structure and not in a
>> single line. could anyone know how to process this in SPARK.
>>
>> I used Jackson jar to process json and was able to do it when it is
>> present in single line. Any ideas?
>>
>> Thanks,
>> Asmath
>>
>


Re: Parse Json in Spark

2016-05-08 Thread Hyukjin Kwon
I remember this Jira, https://issues.apache.org/jira/browse/SPARK-7366.
Parsing multiple lines are not supported in Json fsta source.

Instead this can be done by sc.wholeTextFiles(). I found some examples
here,
http://searchdatascience.com/spark-adventures-1-processing-multi-line-json-files

Although this reads a file as a whole record, this should work.

Thanks!
On 9 May 2016 7:20 a.m., "KhajaAsmath Mohammed" 
wrote:

> Hi,
>
> I am working on parsing the json in spark but most of the information
> available online states that  I need to have entire JSON in single line.
>
> In my case, Json file is delivered in complex structure and not in a
> single line. could anyone know how to process this in SPARK.
>
> I used Jackson jar to process json and was able to do it when it is
> present in single line. Any ideas?
>
> Thanks,
> Asmath
>


Parse Json in Spark

2016-05-08 Thread KhajaAsmath Mohammed
Hi,

I am working on parsing the json in spark but most of the information
available online states that  I need to have entire JSON in single line.

In my case, Json file is delivered in complex structure and not in a single
line. could anyone know how to process this in SPARK.

I used Jackson jar to process json and was able to do it when it is present
in single line. Any ideas?

Thanks,
Asmath


BlockManager crashing applications

2016-05-08 Thread Brandon White
Hello all,

I am running a Spark application which schedules multiple Spark jobs.
Something like:

val df  = sqlContext.read.parquet("/path/to/file")

filterExpressions.par.foreach { expression =>
  df.filter(expression).count()
}

When the block manager fails to fetch a block, it throws an exception which
eventually kills the exception: http://pastebin.com/2ggwv68P

This code works when I run it on one thread with:

filterExpressions.foreach { expression =>
  df.filter(expression).count()
}

But I really need the parallel execution of the jobs. Is there anyway
around this? It seems like a bug in the BlockManagers doGetRemote function.
I have tried the HTTP Block Manager as well.


different SqlContext with same udf name with different meaning

2016-05-08 Thread Igor Berman
Hi,
suppose I have multitenant environment and I want to give my users
additional functions
but for each user/tenant the meaning of same function is dependent on
user's specific configuration

is it possible to register same function several times under different
SqlContexts?
are several SqlContexts can live together?


thanks in advance



something like :

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

sqlContext.udf().register("my_function", func1...);


SQLContext sqlContext2 = new org.apache.spark.sql.SQLContext(sc)

sqlContext2.udf().register("my_function", func2...);


Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Nirav Patel
Yes my mistake. I am using Spark 1.5.2 not 2.x.

I looked at running spark driver jvm process on linux. Looks like my
settings are not being applied to driver. We use oozie spark action to
launch spark. I will have to investigate more on that.

hopefully spark is or have replaced memory killer Java serializer to better
streaming serializer.

Thanks

On Sun, May 8, 2016 at 9:33 AM, Ted Yu  wrote:

> See the following:
> [SPARK-7997][CORE] Remove Akka from Spark Core and Streaming
>
> I guess you meant you are using Spark 1.5.1
>
> For the time being, consider increasing spark.driver.memory
>
> Cheers
>
> On Sun, May 8, 2016 at 9:14 AM, Nirav Patel  wrote:
>
>> Yes, I am using yarn client mode hence I specified am settings too.
>> What you mean akka is moved out of picture? I am using spark 2.5.1
>>
>> Sent from my iPhone
>>
>> On May 8, 2016, at 6:39 AM, Ted Yu  wrote:
>>
>> Are you using YARN client mode ?
>>
>> See
>> https://spark.apache.org/docs/latest/running-on-yarn.html
>>
>> In cluster mode, spark.yarn.am.memory is not effective.
>>
>> For Spark 2.0, akka is moved out of the picture.
>> FYI
>>
>> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel 
>> wrote:
>>
>>> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one.
>>> All of them have 6474 tasks. 5th task is a count operations and it also
>>> performs aggregateByKey as a part of it lazy evaluation.
>>> I am setting:
>>> spark.driver.memory=10G, spark.yarn.am.memory=2G and
>>> spark.driver.maxResultSize=9G
>>>
>>>
>>> On a side note, could it be something to do with java serialization
>>> library, ByteArrayOutputStream using byte array? Can it be replaced by
>>> some better serializing library?
>>>
>>> https://bugs.openjdk.java.net/browse/JDK-8055949
>>> https://bugs.openjdk.java.net/browse/JDK-8136527
>>>
>>> Thanks
>>>
>>> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey 
>>> wrote:
>>>
 Driver maintains the complete metadata of application ( scheduling of
 executor and maintaining the messaging to control the execution )
 This code seems to be failing in that code path only. With that said
 there is Jvm overhead based on num of executors , stages and tasks in your
 app. Do you know your driver heap size and application structure ( num of
 stages and tasks )

 Ashish

 On Saturday, May 7, 2016, Nirav Patel  wrote:

> Right but this logs from spark driver and spark driver seems to use
> Akka.
>
> ERROR [sparkDriver-akka.actor.default-dispatcher-17]
> akka.actor.ActorSystemImpl: Uncaught fatal error from thread
> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
> ActorSystem [sparkDriver]
>
> I saw following logs before above happened.
>
> 2016-05-06 09:49:17,813 INFO
> [sparkDriver-akka.actor.default-dispatcher-17]
> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>
>
> As far as I know driver is just driving shuffle operation but not
> actually doing anything within its own system that will cause memory 
> issue.
> Can you explain in what circumstances I could see this error in driver
> logs? I don't do any collect or any other driver operation that would 
> cause
> this. It fails when doing aggregateByKey operation but that should happen
> in executor JVM NOT in driver JVM.
>
>
> Thanks
>
> On Sat, May 7, 2016 at 11:58 AM, Ted Yu  wrote:
>
>> bq.   at akka.serialization.JavaSerializer.toBinary(
>> Serializer.scala:129)
>>
>> It was Akka which uses JavaSerializer
>>
>> Cheers
>>
>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel 
>> wrote:
>>
>>> Hi,
>>>
>>> I thought I was using kryo serializer for shuffle.  I could verify
>>> it from spark UI - Environment tab that
>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>> spark.kryo.registrator
>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>>
>>>
>>> But when I see following error in Driver logs it looks like spark is
>>> using JavaSerializer
>>>
>>> 2016-05-06 09:49:26,490 ERROR
>>> [sparkDriver-akka.actor.default-dispatcher-17] 
>>> akka.actor.ActorSystemImpl:
>>> Uncaught fatal error from thread
>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
>>> ActorSystem [sparkDriver]
>>>
>>> java.lang.OutOfMemoryError: Java heap space
>>>
>>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>>
>>> at
>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>>
>>> at
>>> 

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Ted Yu
See the following:
[SPARK-7997][CORE] Remove Akka from Spark Core and Streaming

I guess you meant you are using Spark 1.5.1

For the time being, consider increasing spark.driver.memory

Cheers

On Sun, May 8, 2016 at 9:14 AM, Nirav Patel  wrote:

> Yes, I am using yarn client mode hence I specified am settings too.
> What you mean akka is moved out of picture? I am using spark 2.5.1
>
> Sent from my iPhone
>
> On May 8, 2016, at 6:39 AM, Ted Yu  wrote:
>
> Are you using YARN client mode ?
>
> See
> https://spark.apache.org/docs/latest/running-on-yarn.html
>
> In cluster mode, spark.yarn.am.memory is not effective.
>
> For Spark 2.0, akka is moved out of the picture.
> FYI
>
> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel  wrote:
>
>> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one.
>> All of them have 6474 tasks. 5th task is a count operations and it also
>> performs aggregateByKey as a part of it lazy evaluation.
>> I am setting:
>> spark.driver.memory=10G, spark.yarn.am.memory=2G and
>> spark.driver.maxResultSize=9G
>>
>>
>> On a side note, could it be something to do with java serialization
>> library, ByteArrayOutputStream using byte array? Can it be replaced by
>> some better serializing library?
>>
>> https://bugs.openjdk.java.net/browse/JDK-8055949
>> https://bugs.openjdk.java.net/browse/JDK-8136527
>>
>> Thanks
>>
>> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey 
>> wrote:
>>
>>> Driver maintains the complete metadata of application ( scheduling of
>>> executor and maintaining the messaging to control the execution )
>>> This code seems to be failing in that code path only. With that said
>>> there is Jvm overhead based on num of executors , stages and tasks in your
>>> app. Do you know your driver heap size and application structure ( num of
>>> stages and tasks )
>>>
>>> Ashish
>>>
>>> On Saturday, May 7, 2016, Nirav Patel  wrote:
>>>
 Right but this logs from spark driver and spark driver seems to use
 Akka.

 ERROR [sparkDriver-akka.actor.default-dispatcher-17]
 akka.actor.ActorSystemImpl: Uncaught fatal error from thread
 [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
 ActorSystem [sparkDriver]

 I saw following logs before above happened.

 2016-05-06 09:49:17,813 INFO
 [sparkDriver-akka.actor.default-dispatcher-17]
 org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
 locations for shuffle 1 to hdn6.xactlycorporation.local:44503


 As far as I know driver is just driving shuffle operation but not
 actually doing anything within its own system that will cause memory issue.
 Can you explain in what circumstances I could see this error in driver
 logs? I don't do any collect or any other driver operation that would cause
 this. It fails when doing aggregateByKey operation but that should happen
 in executor JVM NOT in driver JVM.


 Thanks

 On Sat, May 7, 2016 at 11:58 AM, Ted Yu  wrote:

> bq.   at akka.serialization.JavaSerializer.toBinary(
> Serializer.scala:129)
>
> It was Akka which uses JavaSerializer
>
> Cheers
>
> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel 
> wrote:
>
>> Hi,
>>
>> I thought I was using kryo serializer for shuffle.  I could verify it
>> from spark UI - Environment tab that
>> spark.serializer org.apache.spark.serializer.KryoSerializer
>> spark.kryo.registrator
>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>
>>
>> But when I see following error in Driver logs it looks like spark is
>> using JavaSerializer
>>
>> 2016-05-06 09:49:26,490 ERROR
>> [sparkDriver-akka.actor.default-dispatcher-17] 
>> akka.actor.ActorSystemImpl:
>> Uncaught fatal error from thread
>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: Java heap space
>>
>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>
>> at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Nirav Patel
Yes, I am using yarn client mode hence I specified am settings too.
What you mean akka is moved out of picture? I am using spark 2.5.1 

Sent from my iPhone

> On May 8, 2016, at 6:39 AM, Ted Yu  wrote:
> 
> Are you using YARN client mode ?
> 
> See
> https://spark.apache.org/docs/latest/running-on-yarn.html
> 
> In cluster mode, spark.yarn.am.memory is not effective.
> 
> For Spark 2.0, akka is moved out of the picture.
> FYI
> 
>> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel  wrote:
>> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one. All 
>> of them have 6474 tasks. 5th task is a count operations and it also performs 
>> aggregateByKey as a part of it lazy evaluation. 
>> I am setting:
>> spark.driver.memory=10G, spark.yarn.am.memory=2G and 
>> spark.driver.maxResultSize=9G 
>> 
>> 
>> On a side note, could it be something to do with java serialization library, 
>> ByteArrayOutputStream using byte array? Can it be replaced by some better 
>> serializing library?
>> 
>> https://bugs.openjdk.java.net/browse/JDK-8055949
>> https://bugs.openjdk.java.net/browse/JDK-8136527
>> 
>> Thanks
>> 
>>> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey  wrote:
>>> Driver maintains the complete metadata of application ( scheduling of 
>>> executor and maintaining the messaging to control the execution )
>>> This code seems to be failing in that code path only. With that said there 
>>> is Jvm overhead based on num of executors , stages and tasks in your app. 
>>> Do you know your driver heap size and application structure ( num of stages 
>>> and tasks )
>>> 
>>> Ashish 
>>> 
 On Saturday, May 7, 2016, Nirav Patel  wrote:
 Right but this logs from spark driver and spark driver seems to use Akka.
 
 ERROR [sparkDriver-akka.actor.default-dispatcher-17] 
 akka.actor.ActorSystemImpl: Uncaught fatal error from thread 
 [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down 
 ActorSystem [sparkDriver]
 
 I saw following logs before above happened.
 
 2016-05-06 09:49:17,813 INFO 
 [sparkDriver-akka.actor.default-dispatcher-17] 
 org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output 
 locations for shuffle 1 to hdn6.xactlycorporation.local:44503
 
 
 
 As far as I know driver is just driving shuffle operation but not actually 
 doing anything within its own system that will cause memory issue. Can you 
 explain in what circumstances I could see this error in driver logs? I 
 don't do any collect or any other driver operation that would cause this. 
 It fails when doing aggregateByKey operation but that should happen in 
 executor JVM NOT in driver JVM.
 
 
 
 Thanks
 
 
> On Sat, May 7, 2016 at 11:58 AM, Ted Yu  wrote:
> bq.   at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
> 
> It was Akka which uses JavaSerializer
> 
> Cheers
> 
>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel  
>> wrote:
>> Hi,
>> 
>> I thought I was using kryo serializer for shuffle.  I could verify it 
>> from spark UI - Environment tab that 
>> spark.serializer org.apache.spark.serializer.KryoSerializer
>> spark.kryo.registrator   
>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>> 
>> 
>> But when I see following error in Driver logs it looks like spark is 
>> using JavaSerializer 
>> 
>> 2016-05-06 09:49:26,490 ERROR 
>> [sparkDriver-akka.actor.default-dispatcher-17] 
>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread 
>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down 
>> ActorSystem [sparkDriver]
>> 
>> java.lang.OutOfMemoryError: Java heap space
>> 
>> at java.util.Arrays.copyOf(Arrays.java:2271)
>> 
>> at 
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>> 
>> at 
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> 
>> at 
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>> 
>> at 
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>> 
>> at 
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>> 
>> at 
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>> 
>> at 
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>> 
>> at 
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>> 
>> at 
>> 

Re: Correct way of setting executor numbers and executor cores in Spark 1.6.1 for non-clustered mode ?

2016-05-08 Thread Mich Talebzadeh
Hi Karen,

You mentioned:

"So if I'm reading your email correctly it sounds like I should be able to
increase the number of executors on local mode by adding hostnames for
localhost.
and cores per executor with SPARK_EXECUTOR_CORES.
And by starting master/slave(s) for local host I can access webui to see
whats going on."

The problem is that Spark is memory hungry so to speak. Now you can add
more workers (slaves) to the list but if you don't have enough memory it
will just queue up and won't do much I am afraid.

So out of 16GB RAM you have 6GB free.

For example in mine I have 8.9GB free (out of 24GB.


*free* total   used   free sharedbuffers
cached
Mem:  24546308   23653300 893008  01163364   15293448
-/+ buffers/cache:7196488   17349820
Swap:  20316082029184   2424

Now you can get more details of resources by doing the following


*cat /proc/meminfo*MemTotal: 24546308 kB
MemFree:898488 kB
Buffers:   1163732 kB
Cached:   15293628 kB
SwapCached: 231924 kB
Active:   17129696 kB
Inactive:  5553284 kB
HighTotal:   0 kB
HighFree:0 kB
LowTotal: 24546308 kB
LowFree:898488 kB
SwapTotal: 2031608 kB
SwapFree: 2424 kB
Dirty: 408 kB
Writeback:   0 kB
AnonPages: 5693420 kB
Mapped:7643580 kB
Slab:   702452 kB
PageTables: 182760 kB
NFS_Unstable:0 kB
Bounce:  0 kB
CommitLimit:  14304760 kB
Committed_AS: 36210252 kB
VmallocTotal: 34359738367 kB
VmallocUsed:  1760 kB
VmallocChunk: 34359736355 kB
HugePages_Total: 0
HugePages_Free:  0
HugePages_Rsvd:  0
Hugepagesize: 2048 kB

So you can see where you are. To be honest with you from my experience it
sounds like you have enough resources for one container in other words one
job, one executor, so you may have to live with it.

also the command

cat /proc/cpuinfo

will also tell you how many cores you have

and of course the summary will be in *top* command

So I am not sure there is much you can do about except trying to get as
much as you can.

BTW are you also running hive on this host or any other database?

*ipcs -m*
-- Shared Memory Segments 
keyshmid  owner  perms  bytes  nattch status
0x 32768  gdm   600393216 2  dest
0x 3080193oracle6404517888212
0x 3112962oracle6407566524416 106
0x 3145731oracle64012259328   106
0x5b266174 3178500oracle64036864  106
0xe916010c 103841797  sybase6001572864 1

Anyway post the output and see how we can all help you.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 8 May 2016 at 12:34, Karen Murphy  wrote:

>
> Hi Mich,
>
> I have just seen your reply for first time.  Don't know why I can't see it
> on the online mailing list, possibly just delayed.  I kept checking it for
> replies rather than logging in for emails.  Thanks I will try what you
> suggest.
>
> So if I'm reading your email correctly it sounds like I should be able to
> increase the number of executors on local mode by adding hostnames for
> localhost.
> and cores per executor with SPARK_EXECUTOR_CORES.
> And by starting master/slave(s) for local host I can access webui to see
> whats going on.
>
> I did check 'free' and looks like there is a lot less free memory (from
> the total of 16GB) than I had thought.  In fact just under 6GB, no jobs
> currently running, and just hadoop-daemons.
>
> Will reply to your email on the list when it appears to report results
> when I have them,
>
> Thanks for help and sorry for confusion,
> Karen
>
>
> 
> From: Mich Talebzadeh [mich.talebza...@gmail.com]
> Sent: 07 May 2016 15:01
> To: Karen Murphy
> Cc: user @spark
> Subject: Re: Correct way of setting executor numbers and executor cores in
> Spark 1.6.1 for non-clustered mode ?
>
> Check how much free memory you have on your hosr
>
> /usr/bin/free
>
>
> as a heuristic values start with these in
>
> export SPARK_EXECUTOR_CORES=4 ##, Number of cores for the workers
> (Default: 1).
> export SPARK_EXECUTOR_MEMORY=8G ## , Memory per Worker (e.g. 1000M, 2G)
> (Default: 1G)
> export SPARK_DRIVER_MEMORY=1G ## , Memory for Master (e.g. 1000M, 2G)
> (Default: 512 Mb)
>
> in conf/spark-env.sh  and then increase another worker processes by adding
> you standalone hostname to /conf/slaves. So that will create two worker
> processes on that hostnames.
>
> do sbin/start-master.sh (if not started) and do start-slaves.sh.
>
> Log in to spark GUI websites for job on hostname:4040/executors/
>
> And test your jobs for timing and 

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Ted Yu
Are you using YARN client mode ?

See
https://spark.apache.org/docs/latest/running-on-yarn.html

In cluster mode, spark.yarn.am.memory is not effective.

For Spark 2.0, akka is moved out of the picture.
FYI

On Sat, May 7, 2016 at 8:24 PM, Nirav Patel  wrote:

> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one.
> All of them have 6474 tasks. 5th task is a count operations and it also
> performs aggregateByKey as a part of it lazy evaluation.
> I am setting:
> spark.driver.memory=10G, spark.yarn.am.memory=2G and
> spark.driver.maxResultSize=9G
>
>
> On a side note, could it be something to do with java serialization
> library, ByteArrayOutputStream using byte array? Can it be replaced by
> some better serializing library?
>
> https://bugs.openjdk.java.net/browse/JDK-8055949
> https://bugs.openjdk.java.net/browse/JDK-8136527
>
> Thanks
>
> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey  wrote:
>
>> Driver maintains the complete metadata of application ( scheduling of
>> executor and maintaining the messaging to control the execution )
>> This code seems to be failing in that code path only. With that said
>> there is Jvm overhead based on num of executors , stages and tasks in your
>> app. Do you know your driver heap size and application structure ( num of
>> stages and tasks )
>>
>> Ashish
>>
>> On Saturday, May 7, 2016, Nirav Patel  wrote:
>>
>>> Right but this logs from spark driver and spark driver seems to use Akka.
>>>
>>> ERROR [sparkDriver-akka.actor.default-dispatcher-17]
>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread
>>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
>>> ActorSystem [sparkDriver]
>>>
>>> I saw following logs before above happened.
>>>
>>> 2016-05-06 09:49:17,813 INFO
>>> [sparkDriver-akka.actor.default-dispatcher-17]
>>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
>>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>>>
>>>
>>> As far as I know driver is just driving shuffle operation but not
>>> actually doing anything within its own system that will cause memory issue.
>>> Can you explain in what circumstances I could see this error in driver
>>> logs? I don't do any collect or any other driver operation that would cause
>>> this. It fails when doing aggregateByKey operation but that should happen
>>> in executor JVM NOT in driver JVM.
>>>
>>>
>>> Thanks
>>>
>>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu  wrote:
>>>
 bq.   at akka.serialization.JavaSerializer.toBinary(
 Serializer.scala:129)

 It was Akka which uses JavaSerializer

 Cheers

 On Sat, May 7, 2016 at 11:13 AM, Nirav Patel 
 wrote:

> Hi,
>
> I thought I was using kryo serializer for shuffle.  I could verify it
> from spark UI - Environment tab that
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.kryo.registrator
> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>
>
> But when I see following error in Driver logs it looks like spark is
> using JavaSerializer
>
> 2016-05-06 09:49:26,490 ERROR
> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
> Uncaught fatal error from thread
> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
> ActorSystem [sparkDriver]
>
> java.lang.OutOfMemoryError: Java heap space
>
> at java.util.Arrays.copyOf(Arrays.java:2271)
>
> at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>
> at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>
> at
> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>
> at
> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>
> at
> 

Re: Is it a bug?

2016-05-08 Thread Ted Yu
I don't think so. 
RDD is immutable. 

> On May 8, 2016, at 2:14 AM, Sisyphuss  wrote:
> 
>  
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-a-bug-tp26898.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is it a bug?

2016-05-08 Thread Sisyphuss
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-a-bug-tp26898.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark dataframe sort issue

2016-05-08 Thread Buntu Dev
Thanks Davies, after I did a coalesce(1) to save as single parquet file I
was able to get the head() to return the correct order.

On Sun, May 8, 2016 at 12:29 AM, Davies Liu  wrote:

> When you have multiple parquet files, the order of all the rows in
> them is not defined.
>
> On Sat, May 7, 2016 at 11:48 PM, Buntu Dev  wrote:
> > I'm using pyspark dataframe api to sort by specific column and then
> saving
> > the dataframe as parquet file. But the resulting parquet file doesn't
> seem
> > to be sorted.
> >
> > Applying sort and doing a head() on the results shows the correct results
> > sorted by 'value' column in desc order, as shown below:
> >
> > ~
> >>>df=sqlContext.read.parquet("/some/file.parquet")
> >>>df.printSchema()
> >
> > root
> >  |-- c1: string (nullable = true)
> >  |-- c2: string (nullable = true)
> >  |-- value: double (nullable = true)
> >
> >>>df.sort(df.value.desc()).head(3)
> >
> > [Row(c1=u'546', c2=u'234', value=1020.25), Row(c1=u'3212', c2=u'6785',
> > value=890.6), Row(c1=u'546', c2=u'234', value=776.45)]
> > ~~
> >
> > But saving the sorted dataframe as parquet and fetching the first N rows
> > using head() doesn't seem to return the results ordered by 'value'
> column:
> >
> > 
> >>>df=sqlContext.read.parquet("/some/file.parquet")
> >>>df.sort(df.value.desc()).write.parquet("/sorted/file.parquet")
> > ...
> >>>df2=sqlContext.read.parquet("/sorted/file.parquet")
> >>>df2.head(3)
> >
> > [Row(c1=u'444', b2=u'233', value=0.024120907), Row(c1=u'5672',
> c2=u'9098',
> > value=0.024120906), Row(c1=u'546', c2=u'234', value=0.024120905)]
> > 
> >
> > How do I go about sorting and saving a sorted dataframe?
> >
> >
> > Thanks!
>


Re: pyspark dataframe sort issue

2016-05-08 Thread Davies Liu
When you have multiple parquet files, the order of all the rows in
them is not defined.

On Sat, May 7, 2016 at 11:48 PM, Buntu Dev  wrote:
> I'm using pyspark dataframe api to sort by specific column and then saving
> the dataframe as parquet file. But the resulting parquet file doesn't seem
> to be sorted.
>
> Applying sort and doing a head() on the results shows the correct results
> sorted by 'value' column in desc order, as shown below:
>
> ~
>>>df=sqlContext.read.parquet("/some/file.parquet")
>>>df.printSchema()
>
> root
>  |-- c1: string (nullable = true)
>  |-- c2: string (nullable = true)
>  |-- value: double (nullable = true)
>
>>>df.sort(df.value.desc()).head(3)
>
> [Row(c1=u'546', c2=u'234', value=1020.25), Row(c1=u'3212', c2=u'6785',
> value=890.6), Row(c1=u'546', c2=u'234', value=776.45)]
> ~~
>
> But saving the sorted dataframe as parquet and fetching the first N rows
> using head() doesn't seem to return the results ordered by 'value' column:
>
> 
>>>df=sqlContext.read.parquet("/some/file.parquet")
>>>df.sort(df.value.desc()).write.parquet("/sorted/file.parquet")
> ...
>>>df2=sqlContext.read.parquet("/sorted/file.parquet")
>>>df2.head(3)
>
> [Row(c1=u'444', b2=u'233', value=0.024120907), Row(c1=u'5672', c2=u'9098',
> value=0.024120906), Row(c1=u'546', c2=u'234', value=0.024120905)]
> 
>
> How do I go about sorting and saving a sorted dataframe?
>
>
> Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sqlCtx.read.parquet yields lots of small tasks

2016-05-08 Thread Johnny W.
The file size is very small (< 1M). The stage launches every time i call:
--
sqlContext.read.parquet(path_to_file)

These are the parquet specific configurations I set:
--
spark.sql.parquet.filterPushdown: true
spark.sql.parquet.mergeSchema: true

Thanks,
J.

On Sat, May 7, 2016 at 4:20 PM, Ashish Dubey  wrote:

> How big is your file and can you also share the code snippet
>
>
> On Saturday, May 7, 2016, Johnny W.  wrote:
>
>> hi spark-user,
>>
>> I am using Spark 1.6.0. When I call sqlCtx.read.parquet to create a
>> dataframe from a parquet data source with a single parquet file, it yields
>> a stage with lots of small tasks. It seems the number of tasks depends on
>> how many executors I have instead of how many parquet files/partitions I
>> have. Actually, it launches 5 tasks on each executor.
>>
>> This behavior is quite strange, and may have potential issue if there is
>> a slow executor. What is this "parquet" stage for? and why it launches 5
>> tasks on each executor?
>>
>> Thanks,
>> J.
>>
>


pyspark dataframe sort issue

2016-05-08 Thread Buntu Dev
I'm using pyspark dataframe api to sort by specific column and then saving
the dataframe as parquet file. But the resulting parquet file doesn't seem
to be sorted.

Applying sort and doing a head() on the results shows the correct results
sorted by 'value' column in desc order, as shown below:

~
>>df=sqlContext.read.parquet("/some/file.parquet")
>>df.printSchema()

root
 |-- c1: string (nullable = true)
 |-- c2: string (nullable = true)
 |-- value: double (nullable = true)

>>df.sort(df.value.desc()).head(3)

[Row(c1=u'546', c2=u'234', value=1020.25), Row(c1=u'3212', c2=u'6785',
value=890.6), Row(c1=u'546', c2=u'234', value=776.45)]
~~

But saving the sorted dataframe as parquet and fetching the first N rows
using head() doesn't seem to return the results ordered by 'value' column:


>>df=sqlContext.read.parquet("/some/file.parquet")
>>df.sort(df.value.desc()).write.parquet("/sorted/file.parquet")
...
>>df2=sqlContext.read.parquet("/sorted/file.parquet")
>>df2.head(3)

[Row(c1=u'444', b2=u'233', value=0.024120907), Row(c1=u'5672', c2=u'9098',
value=0.024120906), Row(c1=u'546', c2=u'234', value=0.024120905)]


How do I go about sorting and saving a sorted dataframe?


Thanks!


Joining a RDD to a Dataframe

2016-05-08 Thread Cyril Scetbon
Hi,

I have a RDD built during a spark streaming job and I'd like to join it to a 
DataFrame (E/S input) to enrich it.
It seems that I can't join the RDD and the DF without converting first the RDD 
to a DF (Tell me if I'm wrong). Here are the schemas of both DF :

scala> df
res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: 
array>, id: string]

scala> df_input
res33: org.apache.spark.sql.DataFrame = [id: string]

scala> df_input.collect
res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])

I can get ids I want if I know the value to look for in addresses.id using :

scala> df.filter(array_contains(df("addresses.id"), 
"idaddress2")).select("id").collect
res35: Array[org.apache.spark.sql.Row] = Array([], [YY])

However when I try to join df_input and df and to use the previous filter as 
the join condition I get an exception :

scala> df.join(df_input, array_contains(df("adresses.id"), df_input("id")))
java.lang.RuntimeException: Unsupported literal type class 
org.apache.spark.sql.Column id
at 
org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
at org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
...

It seems that array_contains only supports static arguments and does not 
replace a sql.Column by its value.

What's the best way to achieve what I want to do ? (Also speaking in term of 
performance)

Thanks
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org