Re: Confusing RDD function

2016-03-08 Thread Hemminger Jeff
Thank you, yes that makes sense.
I was aware of transformations and actions, but did not realize foreach was
an action. I've found the exhaustive list here
http://spark.apache.org/docs/latest/programming-guide.html#actions
and it's clear to me again.

Thank you for your help!

On Wed, Mar 9, 2016 at 11:37 AM, Jakob Odersky  wrote:

> Hi Jeff,
>
> > But in our development environment, the returned RDD results were empty
> and b.function(_) was never executed
> what do you mean by "the returned RDD results were empty", did you try
> running a foreach, collect or any other action on the returned RDD[C]?
>
> Spark provides two kinds of operations on RDDs:
> 1. transformations, which return a new RDD and are lazy and
> 2. actions that actually run an RDD and return some kind of result.
> In your example above, 'map' is a transformation and thus is not
> actually applied until some action (like 'foreach') is called on the
> resulting RDD.
> You can find more information in the Spark Programming Guide
> http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations.
>
> best,
> --Jakob
>
> On Tue, Mar 8, 2016 at 5:41 PM, Hemminger Jeff  wrote:
> >
> > I'm currently developing a Spark Streaming application.
> >
> > I have a function that receives an RDD and an object instance as  a
> > parameter, and returns an RDD:
> >
> > def doTheThing(a: RDD[A], b: B): RDD[C]
> >
> >
> > Within the function, I do some processing within a map of the RDD.
> > Like this:
> >
> >
> > def doTheThing(a: RDD[A], b: B): RDD[C] {
> >
> >   a.combineByKey(...).map(b.function(_))
> >
> > }
> >
> >
> > I combine the RDD by key, then map the results calling a function of
> > instance b, and return the results.
> >
> > Here is where I ran into trouble.
> >
> > In a unit test running Spark in memory, I was able to convince myself
> that
> > this worked well.
> >
> > But in our development environment, the returned RDD results were empty
> and
> > b.function(_) was never executed.
> >
> > However, when I added an otherwise useless foreach:
> >
> >
> > doTheThing(a: RDD[A], b: B): RDD[C] {
> >
> >   val results = a.combineByKey(...).map(b.function(_))
> >
> >   results.foreach( p => p )
> >
> >   results
> >
> > }
> >
> >
> > Then it works.
> >
> > So, basically, adding an extra foreach iteration appears to cause
> > b.function(_) to execute and returns results correctly.
> >
> > I find this confusing. Can anyone shed some light on why this would be?
> >
> > Thank you,
> > Jeff
> >
> >
> >
>


Confusing RDD function

2016-03-08 Thread Hemminger Jeff
I'm currently developing a Spark Streaming application.

I have a function that receives an RDD and an object instance as  a
parameter, and returns an RDD:

def doTheThing(a: RDD[A], b: B): RDD[C]


Within the function, I do some processing within a map of the RDD.
Like this:


def doTheThing(a: RDD[A], b: B): RDD[C] {

  a.combineByKey(...).map(b.function(_))

}


I combine the RDD by key, then map the results calling a function of
instance b, and return the results.

Here is where I ran into trouble.

In a unit test running Spark in memory, I was able to convince myself that
this worked well.

But in our development environment, the returned RDD results were empty and
b.function(_) was never executed.

However, when I added an otherwise useless foreach:


doTheThing(a: RDD[A], b: B): RDD[C] {

  val results = a.combineByKey(...).map(b.function(_))

  results.foreach( p => p )

  results

}


Then it works.

So, basically, adding an extra foreach iteration appears to cause
b.function(_) to execute and returns results correctly.

I find this confusing. Can anyone shed some light on why this would be?

Thank you,
Jeff


Re: String operation in filter with a special character

2015-10-05 Thread Hemminger Jeff
Thank you!

On Tue, Oct 6, 2015 at 4:50 AM, Michael Armbrust 
wrote:

> Double quotes (") are used to create string literals in HiveQL / Spark
> SQL.  So you are asking if the string A+B equals the number 2.0.
>
> You should use backticks (`) to escape weird characters in column names.
>
> On Mon, Oct 5, 2015 at 12:59 AM, Hemminger Jeff  wrote:
>
>> I have a rather odd use case. I have a DataFrame column name with a +
>> value in it.
>> The app performs some processing steps before determining the column
>> name, and it
>> would be much easier to code if I could use the DataFrame filter
>> operations with a String.
>>
>> This demonstrates the issue I am having:
>>
>> dataFrame.filter(renamed("A+B").equalTo(2.0)).show()
>>
>> This will return all rows with the column value matching 2.0, as expected.
>>
>> dataFrame.filter("\"A+B\"=2.0").show()
>>
>> This executes but does not return the correct results. It returns an
>> empty result.
>>
>> dataFrame.filter("\"A+C\"=2.0").show()
>>
>> Referencing a non-existent column name returns the same empty result.
>>
>> Any suggestions?
>>
>> Jeff
>>
>
>


Re: spark-ec2 config files.

2015-10-05 Thread Hemminger Jeff
The spark-ec2 script generates spark config files from templates. Those are
located here:
https://github.com/amplab/spark-ec2/tree/branch-1.5/templates/root/spark/conf
Note the link is referring to the 1.5 branch.
Is this what you are looking for?
Jeff

On Mon, Oct 5, 2015 at 8:56 AM, Renato Perini 
wrote:

> Can someone provide the relevant config files generated by Spark EC2
> script?
> I'm configuring a Spark cluster on EC2 manually, and I would like to
> compare my config files (spark-defaults.conf, spark-env.sh) with those
> generated by the spark-ec2 script.
> Of course, hide your sensitive informations.
>
> Thank you.
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


String operation in filter with a special character

2015-10-04 Thread Hemminger Jeff
I have a rather odd use case. I have a DataFrame column name with a + value
in it.
The app performs some processing steps before determining the column name,
and it
would be much easier to code if I could use the DataFrame filter operations
with a String.

This demonstrates the issue I am having:

dataFrame.filter(renamed("A+B").equalTo(2.0)).show()

This will return all rows with the column value matching 2.0, as expected.

dataFrame.filter("\"A+B\"=2.0").show()

This executes but does not return the correct results. It returns an empty
result.

dataFrame.filter("\"A+C\"=2.0").show()

Referencing a non-existent column name returns the same empty result.

Any suggestions?

Jeff


What happens when cache is full?

2015-09-12 Thread Hemminger Jeff
I am trying to understand the process of caching and specifically what the
behavior is when the cache is full. Please excuse me if this question is a
little vague, I am trying to build my understanding of this process.

I have an RDD that I perform several computations with, I persist it with
IN_MEMORY_SER before performing the computations.

I believe that, due to insufficient memory, it is recomputing (at least
part of) the RDD each time.

Logging shows that the RDD was not cached previously, and therefore needs
to be computed.

I looked at the BlockManager Spark code, and see that getOrCompute attempts
to retrieve memory from cache. If it is not available, it computes it.

Can I assume that when Spark attempts to cache an RDD but runs out of
memory, it recomputes a part of the RDD each time it is read?

I think I might be incorrect in this assumption, because I would have
expected a warning message if the cache was out of memory.

Thanks,
Jeff


Re: Alternative to Large Broadcast Variables

2015-08-29 Thread Hemminger Jeff
Thanks for the recommendations. I had been focused on solving the problem
"within Spark" but a distributed database sounds like a better solution.

Jeff

On Sat, Aug 29, 2015 at 11:47 PM, Ted Yu  wrote:

> Not sure if the race condition you mentioned is related to Cassandra's
> data consistency model.
>
> If hbase is used as the external key value store, atomicity is guaranteed.
>
> Cheers
>
> On Sat, Aug 29, 2015 at 7:40 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> We are using Cassandra for similar kind of problem and it works well...
>> You need to take care of race condition between updating the store and
>> looking up the store...
>> On Aug 29, 2015 1:31 AM, "Ted Yu"  wrote:
>>
>>> +1 on Jason's suggestion.
>>>
>>> bq. this large variable is broadcast many times during the lifetime
>>>
>>> Please consider making this large variable more granular. Meaning,
>>> reduce the amount of data transferred between the key value store and
>>> your app during update.
>>>
>>> Cheers
>>>
>>> On Fri, Aug 28, 2015 at 12:44 PM, Jason  wrote:
>>>
>>>> You could try using an external key value store (like HBase, Redis) and
>>>> perform lookups/updates inside of your mappers (you'd need to create the
>>>> connection within a mapPartitions code block to avoid the connection
>>>> setup/teardown overhead)?
>>>>
>>>> I haven't done this myself though, so I'm just throwing the idea out
>>>> there.
>>>>
>>>> On Fri, Aug 28, 2015 at 3:39 AM Hemminger Jeff 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am working on a Spark application that is using of a large (~3G)
>>>>> broadcast variable as a lookup table. The application refines the data in
>>>>> this lookup table in an iterative manner. So this large variable is
>>>>> broadcast many times during the lifetime of the application process.
>>>>>
>>>>> From what I have observed perhaps 60% of the execution time is spent
>>>>> waiting for the variable to broadcast in each iteration. My reading of a
>>>>> Spark performance article[1] suggests that the time spent broadcasting 
>>>>> will
>>>>> increase with the number of nodes I add.
>>>>>
>>>>> My question for the group - what would you suggest as an alternative
>>>>> to broadcasting a large variable like this?
>>>>>
>>>>> One approach I have considered is segmenting my RDD and adding a copy
>>>>> of the lookup table for each X number of values to process. So, for
>>>>> example, if I have a list of 1 million entries to process (eg, 
>>>>> RDD[Entry]),
>>>>> I could split this into segments of 100K entries, with a copy of the 
>>>>> lookup
>>>>> table, and make that an RDD[(Lookup, Array[Entry]).
>>>>>
>>>>> Another solution I am looking at it is making the lookup table an RDD
>>>>> instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
>>>>> improve performance. One issue with this approach is that I would have to
>>>>> rewrite my application code to use two RDDs so that I do not reference the
>>>>> lookup RDD in the from within the closure of another RDD.
>>>>>
>>>>> Any other recommendations?
>>>>>
>>>>> Jeff
>>>>>
>>>>>
>>>>> [1]
>>>>> http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf
>>>>>
>>>>> [2]https://github.com/amplab/spark-indexedrdd
>>>>>
>>>>
>>>
>


Alternative to Large Broadcast Variables

2015-08-28 Thread Hemminger Jeff
Hi,

I am working on a Spark application that is using of a large (~3G)
broadcast variable as a lookup table. The application refines the data in
this lookup table in an iterative manner. So this large variable is
broadcast many times during the lifetime of the application process.

>From what I have observed perhaps 60% of the execution time is spent
waiting for the variable to broadcast in each iteration. My reading of a
Spark performance article[1] suggests that the time spent broadcasting will
increase with the number of nodes I add.

My question for the group - what would you suggest as an alternative to
broadcasting a large variable like this?

One approach I have considered is segmenting my RDD and adding a copy of
the lookup table for each X number of values to process. So, for example,
if I have a list of 1 million entries to process (eg, RDD[Entry]), I could
split this into segments of 100K entries, with a copy of the lookup table,
and make that an RDD[(Lookup, Array[Entry]).

Another solution I am looking at it is making the lookup table an RDD
instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
improve performance. One issue with this approach is that I would have to
rewrite my application code to use two RDDs so that I do not reference the
lookup RDD in the from within the closure of another RDD.

Any other recommendations?

Jeff


[1]
http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf

[2]https://github.com/amplab/spark-indexedrdd