Re: Cassandra row count grouped by multiple columns

2015-09-11 Thread Eric Walker
Hi Chirag,

Maybe something like this?

import org.apache.spark.sql._
import org.apache.spark.sql.types._

val rdd = sc.parallelize(Seq(
  Row("A1", "B1", "C1"),
  Row("A2", "B2", "C2"),
  Row("A3", "B3", "C2"),
  Row("A1", "B1", "C1")
))

val schema = StructType(Seq("a", "b", "c").map(c => StructField(c, StringType)))
val df = sqlContext.createDataFrame(rdd, schema)

df.registerTempTable("rows")
sqlContext.sql("select a, b, c, count(0) as count from rows group by
a, b, c").collect()


Eric


On Thu, Sep 10, 2015 at 2:19 AM, Chirag Dewan 
wrote:

> Hi,
>
>
>
> I am using Spark 1.2.0 with Cassandra 2.0.14. I have a problem where I
> need a count of rows unique to multiple columns.
>
>
>
> So I have a column family with 3 columns i.e. a,b,c and for each value of
> distinct a1,b1,c1 I want the row count.
>
>
>
> For eg:
>
> A1,B1,C1
>
> A2,B2,C2
>
> A3,B3,C2
>
> A1,B1,C1
>
>
>
> The output should be:
>
> A1,B1,C1,2
>
> A2,B2,C2,1
>
> A3,B3,C3,1
>
>
>
> What is the optimum way of achieving this?
>
>
>
> Thanks in advance.
>
>
>
> Chirag
>


Re: spark.shuffle.spill=false ignored?

2015-09-09 Thread Eric Walker
Hi Richard,

I've stepped away from this issue since I raised my question.  An
additional detail that was unknown at the time was that not in every
instance when the spilling to disk was encountered did the application run
out of disk space; that problem appears to have been a one-off problem.
The main challenge was that the spark.shuffle.spill setting seemed to be
ignored.  This might have been the expected behavior given the skew that
was in the data.

More generally, attempts to tweak the application behavior using such
settings as spark.python.worker.memory and spark.shuffle.memoryFraction had
no observable effect.  It is possible that the ignoring of
the spark.shuffle.spill setting was just a manifestation of a larger issue
going back to a misconfiguration.

Eric


On Wed, Sep 9, 2015 at 4:48 PM, Richard Marscher <rmarsc...@localytics.com>
wrote:

> Hi Eric,
>
> I just wanted to do a sanity check, do you know what paths it is trying to
> write to? I ask because even without spilling, shuffles always write to
> disk first before transferring data across the network. I had at one point
> encountered this myself where we accidentally had /tmp mounted on a tiny
> disk and kept running out of disk on shuffles even though we also don't
> spill. You may have already considered or ruled this out though.
>
> On Thu, Sep 3, 2015 at 12:56 PM, Eric Walker <eric.wal...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am using Spark 1.3.1 on EMR with lots of memory.  I have attempted to
>> run a large pyspark job several times, specifying
>> `spark.shuffle.spill=false` in different ways.  It seems that the setting
>> is ignored, at least partially, and some of the tasks start spilling large
>> amounts of data to disk.  The job has been fast enough in the past, but
>> once it starts spilling to disk it lands on Miller's planet [1].
>>
>> Is this expected behavior?  Is it a misconfiguration on my part, e.g.,
>> could there be an incompatible setting that is overriding
>> `spark.shuffle.spill=false`?  Is it something that goes back to Spark
>> 1.3.1?  Is it something that goes back to EMR?  When I've allowed the job
>> to continue on for a while, I've started to see Kryo stack traces in the
>> tasks that are spilling to disk.  The stack traces mention there not being
>> enough disk space, although a `df` shows plenty of space (perhaps after the
>> fact, when temporary files have been cleaned up).
>>
>> Has anyone run into something like this before?  I would be happy to see
>> OOM errors, because that would be consistent with one understanding of what
>> might be going on, but I haven't yet.
>>
>> Eric
>>
>>
>> [1] https://www.youtube.com/watch?v=v7OVqXm7_Pk=active
>>
>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>


spark.shuffle.spill=false ignored?

2015-09-03 Thread Eric Walker
Hi,

I am using Spark 1.3.1 on EMR with lots of memory.  I have attempted to run
a large pyspark job several times, specifying `spark.shuffle.spill=false`
in different ways.  It seems that the setting is ignored, at least
partially, and some of the tasks start spilling large amounts of data to
disk.  The job has been fast enough in the past, but once it starts
spilling to disk it lands on Miller's planet [1].

Is this expected behavior?  Is it a misconfiguration on my part, e.g.,
could there be an incompatible setting that is overriding
`spark.shuffle.spill=false`?  Is it something that goes back to Spark
1.3.1?  Is it something that goes back to EMR?  When I've allowed the job
to continue on for a while, I've started to see Kryo stack traces in the
tasks that are spilling to disk.  The stack traces mention there not being
enough disk space, although a `df` shows plenty of space (perhaps after the
fact, when temporary files have been cleaned up).

Has anyone run into something like this before?  I would be happy to see
OOM errors, because that would be consistent with one understanding of what
might be going on, but I haven't yet.

Eric


[1] https://www.youtube.com/watch?v=v7OVqXm7_Pk=active


Re: cached data between jobs

2015-09-02 Thread Eric Walker
Hi Jeff,

I think I see what you're saying.  I was thinking more of a whole Spark
job, where `spark-submit` is run once to completion and then started up
again, rather than a "job" as seen in the Spark UI.  I take it there is no
implicit caching of results between `spark-submit` runs.

(In the case I was writing about, I think I read too much into the Ganglia
network traffic view.  During the runs which I believed to be IO-bound, I
was carrying out a long-running database transfer on the same network.
After it completed I saw a speedup, not realizing where it came from, and
wondered whether there had been some kind of shifting in the data.)

Eric


On Tue, Sep 1, 2015 at 9:54 PM, Jeff Zhang <zjf...@gmail.com> wrote:

> Hi Eric,
>
> If the 2 jobs share the same parent stages. these stages can be skipped
> for the second job.
>
> Here's one simple example:
>
> val rdd1 = sc.parallelize(1 to 10).map(e=>(e,e))
> val rdd2 = rdd1.groupByKey()
> rdd2.map(e=>e._1).collect() foreach println
> rdd2.map(e=> (e._1, e._2.size)).collect foreach println
>
> Obviously, there are 2 jobs and both of them have 2 stages. Luckily here
> these 2 jobs share the same stage (the first stage of each job), although
> you doesn't cache these data explicitly, once one stage is completed, it is
> marked as available and can used for other jobs. so for the second job, it
> only needs to run one stage.
> You should be able to see the skipped stage in the spark job ui.
>
>
>
> [image: Inline image 1]
>
> On Wed, Sep 2, 2015 at 12:53 AM, Eric Walker <eric.wal...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm noticing that a 30 minute job that was initially IO-bound may not be
>> during subsequent runs.  Is there some kind of between-job caching that
>> happens in Spark or in Linux that outlives jobs and that might be making
>> subsequent runs faster?  If so, is there a way to avoid the caching in
>> order to get a better sense of the worst-case scenario?
>>
>> (It's also possible that I've simply changed something that made things
>> faster.)
>>
>> Eric
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


cached data between jobs

2015-09-01 Thread Eric Walker
Hi,

I'm noticing that a 30 minute job that was initially IO-bound may not be
during subsequent runs.  Is there some kind of between-job caching that
happens in Spark or in Linux that outlives jobs and that might be making
subsequent runs faster?  If so, is there a way to avoid the caching in
order to get a better sense of the worst-case scenario?

(It's also possible that I've simply changed something that made things
faster.)

Eric


Re: bulk upload to Elasticsearch and shuffle behavior

2015-08-31 Thread Eric Walker
I think I have found out what was causing me difficulties.  It seems I was
reading too much into the stage description shown in the "Stages" tab of
the Spark application UI.  While it said "repartition at
NativeMethodAccessorImpl.java:-2", I can infer from the network traffic and
from its response to changes I subsequently made that the actual code that
was running was the code doing the HBase lookups.  I suspect the actual
shuffle, once it occurred, required on the same order of network IO as the
upload to Elasticsearch that followed.

Eric



On Mon, Aug 31, 2015 at 6:09 PM, Eric Walker <eric.wal...@gmail.com> wrote:

> Hi,
>
> I am working on a pipeline that carries out a number of stages, the last
> of which is to build some large JSON objects from information in the
> preceding stages.  The JSON objects are then uploaded to Elasticsearch in
> bulk.
>
> If I carry out a shuffle via a `repartition` call after the JSON documents
> have been created, the upload to ES is fast.  But the shuffle itself takes
> many tens of minutes and is IO-bound.
>
> If I omit the repartition, the upload to ES takes a long time due to a
> complete lack of parallelism.
>
> Currently, the step that precedes the assembling of the JSON documents,
> which goes into the final repartition call, is the querying of pairs of
> object ids.  In a mapper the ids are resolved to documents by querying
> HBase.  The initial pairs of ids are obtained via a query against the SQL
> context, and the query result is repartitioned before going into the mapper
> that resolves the ids into documents.
>
> It's not clear to me why the final repartition preceding the upload to ES
> is required.  I would like to omit it, since it is so expensive and
> involves so much network IO, but have not found a way to do this yet.  If I
> omit the repartition, the job takes much longer.
>
> Does anyone know what might be going on here, and what I might be able to
> do to get rid of the last `repartition` call before the upload to ES?
>
> Eric
>
>


bulk upload to Elasticsearch and shuffle behavior

2015-08-31 Thread Eric Walker
Hi,

I am working on a pipeline that carries out a number of stages, the last of
which is to build some large JSON objects from information in the preceding
stages.  The JSON objects are then uploaded to Elasticsearch in bulk.

If I carry out a shuffle via a `repartition` call after the JSON documents
have been created, the upload to ES is fast.  But the shuffle itself takes
many tens of minutes and is IO-bound.

If I omit the repartition, the upload to ES takes a long time due to a
complete lack of parallelism.

Currently, the step that precedes the assembling of the JSON documents,
which goes into the final repartition call, is the querying of pairs of
object ids.  In a mapper the ids are resolved to documents by querying
HBase.  The initial pairs of ids are obtained via a query against the SQL
context, and the query result is repartitioned before going into the mapper
that resolves the ids into documents.

It's not clear to me why the final repartition preceding the upload to ES
is required.  I would like to omit it, since it is so expensive and
involves so much network IO, but have not found a way to do this yet.  If I
omit the repartition, the job takes much longer.

Does anyone know what might be going on here, and what I might be able to
do to get rid of the last `repartition` call before the upload to ES?

Eric


registering an empty RDD as a temp table in a PySpark SQL context

2015-08-17 Thread Eric Walker
I have an RDD queried from a scan of a data source.  Sometimes the RDD has
rows and at other times it has none.  I would like to register this RDD as
a temporary table in a SQL context.  I suspect this will work in Scala, but
in PySpark some code assumes that the RDD has rows in it, which are used to
verify the schema:

https://github.com/apache/spark/blob/branch-1.3/python/pyspark/sql/context.py#L299

Before I attempt to extend the Scala code to handle an empty RDD or provide
an empty DataFrame that can be registered, I was wondering what people
recommend in this case.  Perhaps there's a simple way of registering an
empty RDD as a temporary table in a PySpark SQL context that I'm
overlooking.

An alternative is to add special case logic in the client code to deal with
an RDD backed by an empty table scan.  But since the SQL will already
handle that, I was hoping to avoid special case logic.

Eric


adding a custom Scala RDD for use in PySpark

2015-08-11 Thread Eric Walker
Hi,

I'm new to Scala, Spark and PySpark and have a question about what approach
to take in the problem I'm trying to solve.

I have noticed that working with HBase tables read in using
`newAPIHadoopRDD` can be quite slow with large data sets when one is
interested in only a small subset of the keyspace.  A prefix scan on the
underlying HBase table in this case takes 11 seconds, while a filter
applied to the full RDD returned by `newAPIHadoopRDD` takes 33 minutes.

I looked around and found no way to specify a prefix scan from the Python
interface.  So I have written a Scala class that can be passed an argument,
which then constructs a scan object, calls `newAPIHadoopRDD` from Scala
with the scan object and feeds the resulting RDD back to Python.

It took a few twists and turns to get this to work.  A final challenge was
the fact that `org.apache.spark.api.python.SerDeUtil` is private.  This
suggests to me that I'm doing something wrong, although I got it to work
with sufficient hackery.

What do people recommend for a general approach in getting PySpark RDDs
from HBase prefix scans?  I hope I have not missed something obvious.

Eric