Re: Cassandra row count grouped by multiple columns
Hi Chirag, Maybe something like this? import org.apache.spark.sql._ import org.apache.spark.sql.types._ val rdd = sc.parallelize(Seq( Row("A1", "B1", "C1"), Row("A2", "B2", "C2"), Row("A3", "B3", "C2"), Row("A1", "B1", "C1") )) val schema = StructType(Seq("a", "b", "c").map(c => StructField(c, StringType))) val df = sqlContext.createDataFrame(rdd, schema) df.registerTempTable("rows") sqlContext.sql("select a, b, c, count(0) as count from rows group by a, b, c").collect() Eric On Thu, Sep 10, 2015 at 2:19 AM, Chirag Dewanwrote: > Hi, > > > > I am using Spark 1.2.0 with Cassandra 2.0.14. I have a problem where I > need a count of rows unique to multiple columns. > > > > So I have a column family with 3 columns i.e. a,b,c and for each value of > distinct a1,b1,c1 I want the row count. > > > > For eg: > > A1,B1,C1 > > A2,B2,C2 > > A3,B3,C2 > > A1,B1,C1 > > > > The output should be: > > A1,B1,C1,2 > > A2,B2,C2,1 > > A3,B3,C3,1 > > > > What is the optimum way of achieving this? > > > > Thanks in advance. > > > > Chirag >
Re: spark.shuffle.spill=false ignored?
Hi Richard, I've stepped away from this issue since I raised my question. An additional detail that was unknown at the time was that not in every instance when the spilling to disk was encountered did the application run out of disk space; that problem appears to have been a one-off problem. The main challenge was that the spark.shuffle.spill setting seemed to be ignored. This might have been the expected behavior given the skew that was in the data. More generally, attempts to tweak the application behavior using such settings as spark.python.worker.memory and spark.shuffle.memoryFraction had no observable effect. It is possible that the ignoring of the spark.shuffle.spill setting was just a manifestation of a larger issue going back to a misconfiguration. Eric On Wed, Sep 9, 2015 at 4:48 PM, Richard Marscher <rmarsc...@localytics.com> wrote: > Hi Eric, > > I just wanted to do a sanity check, do you know what paths it is trying to > write to? I ask because even without spilling, shuffles always write to > disk first before transferring data across the network. I had at one point > encountered this myself where we accidentally had /tmp mounted on a tiny > disk and kept running out of disk on shuffles even though we also don't > spill. You may have already considered or ruled this out though. > > On Thu, Sep 3, 2015 at 12:56 PM, Eric Walker <eric.wal...@gmail.com> > wrote: > >> Hi, >> >> I am using Spark 1.3.1 on EMR with lots of memory. I have attempted to >> run a large pyspark job several times, specifying >> `spark.shuffle.spill=false` in different ways. It seems that the setting >> is ignored, at least partially, and some of the tasks start spilling large >> amounts of data to disk. The job has been fast enough in the past, but >> once it starts spilling to disk it lands on Miller's planet [1]. >> >> Is this expected behavior? Is it a misconfiguration on my part, e.g., >> could there be an incompatible setting that is overriding >> `spark.shuffle.spill=false`? Is it something that goes back to Spark >> 1.3.1? Is it something that goes back to EMR? When I've allowed the job >> to continue on for a while, I've started to see Kryo stack traces in the >> tasks that are spilling to disk. The stack traces mention there not being >> enough disk space, although a `df` shows plenty of space (perhaps after the >> fact, when temporary files have been cleaned up). >> >> Has anyone run into something like this before? I would be happy to see >> OOM errors, because that would be consistent with one understanding of what >> might be going on, but I haven't yet. >> >> Eric >> >> >> [1] https://www.youtube.com/watch?v=v7OVqXm7_Pk=active >> > > > > -- > *Richard Marscher* > Software Engineer > Localytics > Localytics.com <http://localytics.com/> | Our Blog > <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | > Facebook <http://facebook.com/localytics> | LinkedIn > <http://www.linkedin.com/company/1148792?trk=tyah> >
spark.shuffle.spill=false ignored?
Hi, I am using Spark 1.3.1 on EMR with lots of memory. I have attempted to run a large pyspark job several times, specifying `spark.shuffle.spill=false` in different ways. It seems that the setting is ignored, at least partially, and some of the tasks start spilling large amounts of data to disk. The job has been fast enough in the past, but once it starts spilling to disk it lands on Miller's planet [1]. Is this expected behavior? Is it a misconfiguration on my part, e.g., could there be an incompatible setting that is overriding `spark.shuffle.spill=false`? Is it something that goes back to Spark 1.3.1? Is it something that goes back to EMR? When I've allowed the job to continue on for a while, I've started to see Kryo stack traces in the tasks that are spilling to disk. The stack traces mention there not being enough disk space, although a `df` shows plenty of space (perhaps after the fact, when temporary files have been cleaned up). Has anyone run into something like this before? I would be happy to see OOM errors, because that would be consistent with one understanding of what might be going on, but I haven't yet. Eric [1] https://www.youtube.com/watch?v=v7OVqXm7_Pk=active
Re: cached data between jobs
Hi Jeff, I think I see what you're saying. I was thinking more of a whole Spark job, where `spark-submit` is run once to completion and then started up again, rather than a "job" as seen in the Spark UI. I take it there is no implicit caching of results between `spark-submit` runs. (In the case I was writing about, I think I read too much into the Ganglia network traffic view. During the runs which I believed to be IO-bound, I was carrying out a long-running database transfer on the same network. After it completed I saw a speedup, not realizing where it came from, and wondered whether there had been some kind of shifting in the data.) Eric On Tue, Sep 1, 2015 at 9:54 PM, Jeff Zhang <zjf...@gmail.com> wrote: > Hi Eric, > > If the 2 jobs share the same parent stages. these stages can be skipped > for the second job. > > Here's one simple example: > > val rdd1 = sc.parallelize(1 to 10).map(e=>(e,e)) > val rdd2 = rdd1.groupByKey() > rdd2.map(e=>e._1).collect() foreach println > rdd2.map(e=> (e._1, e._2.size)).collect foreach println > > Obviously, there are 2 jobs and both of them have 2 stages. Luckily here > these 2 jobs share the same stage (the first stage of each job), although > you doesn't cache these data explicitly, once one stage is completed, it is > marked as available and can used for other jobs. so for the second job, it > only needs to run one stage. > You should be able to see the skipped stage in the spark job ui. > > > > [image: Inline image 1] > > On Wed, Sep 2, 2015 at 12:53 AM, Eric Walker <eric.wal...@gmail.com> > wrote: > >> Hi, >> >> I'm noticing that a 30 minute job that was initially IO-bound may not be >> during subsequent runs. Is there some kind of between-job caching that >> happens in Spark or in Linux that outlives jobs and that might be making >> subsequent runs faster? If so, is there a way to avoid the caching in >> order to get a better sense of the worst-case scenario? >> >> (It's also possible that I've simply changed something that made things >> faster.) >> >> Eric >> >> > > > -- > Best Regards > > Jeff Zhang >
cached data between jobs
Hi, I'm noticing that a 30 minute job that was initially IO-bound may not be during subsequent runs. Is there some kind of between-job caching that happens in Spark or in Linux that outlives jobs and that might be making subsequent runs faster? If so, is there a way to avoid the caching in order to get a better sense of the worst-case scenario? (It's also possible that I've simply changed something that made things faster.) Eric
Re: bulk upload to Elasticsearch and shuffle behavior
I think I have found out what was causing me difficulties. It seems I was reading too much into the stage description shown in the "Stages" tab of the Spark application UI. While it said "repartition at NativeMethodAccessorImpl.java:-2", I can infer from the network traffic and from its response to changes I subsequently made that the actual code that was running was the code doing the HBase lookups. I suspect the actual shuffle, once it occurred, required on the same order of network IO as the upload to Elasticsearch that followed. Eric On Mon, Aug 31, 2015 at 6:09 PM, Eric Walker <eric.wal...@gmail.com> wrote: > Hi, > > I am working on a pipeline that carries out a number of stages, the last > of which is to build some large JSON objects from information in the > preceding stages. The JSON objects are then uploaded to Elasticsearch in > bulk. > > If I carry out a shuffle via a `repartition` call after the JSON documents > have been created, the upload to ES is fast. But the shuffle itself takes > many tens of minutes and is IO-bound. > > If I omit the repartition, the upload to ES takes a long time due to a > complete lack of parallelism. > > Currently, the step that precedes the assembling of the JSON documents, > which goes into the final repartition call, is the querying of pairs of > object ids. In a mapper the ids are resolved to documents by querying > HBase. The initial pairs of ids are obtained via a query against the SQL > context, and the query result is repartitioned before going into the mapper > that resolves the ids into documents. > > It's not clear to me why the final repartition preceding the upload to ES > is required. I would like to omit it, since it is so expensive and > involves so much network IO, but have not found a way to do this yet. If I > omit the repartition, the job takes much longer. > > Does anyone know what might be going on here, and what I might be able to > do to get rid of the last `repartition` call before the upload to ES? > > Eric > >
bulk upload to Elasticsearch and shuffle behavior
Hi, I am working on a pipeline that carries out a number of stages, the last of which is to build some large JSON objects from information in the preceding stages. The JSON objects are then uploaded to Elasticsearch in bulk. If I carry out a shuffle via a `repartition` call after the JSON documents have been created, the upload to ES is fast. But the shuffle itself takes many tens of minutes and is IO-bound. If I omit the repartition, the upload to ES takes a long time due to a complete lack of parallelism. Currently, the step that precedes the assembling of the JSON documents, which goes into the final repartition call, is the querying of pairs of object ids. In a mapper the ids are resolved to documents by querying HBase. The initial pairs of ids are obtained via a query against the SQL context, and the query result is repartitioned before going into the mapper that resolves the ids into documents. It's not clear to me why the final repartition preceding the upload to ES is required. I would like to omit it, since it is so expensive and involves so much network IO, but have not found a way to do this yet. If I omit the repartition, the job takes much longer. Does anyone know what might be going on here, and what I might be able to do to get rid of the last `repartition` call before the upload to ES? Eric
registering an empty RDD as a temp table in a PySpark SQL context
I have an RDD queried from a scan of a data source. Sometimes the RDD has rows and at other times it has none. I would like to register this RDD as a temporary table in a SQL context. I suspect this will work in Scala, but in PySpark some code assumes that the RDD has rows in it, which are used to verify the schema: https://github.com/apache/spark/blob/branch-1.3/python/pyspark/sql/context.py#L299 Before I attempt to extend the Scala code to handle an empty RDD or provide an empty DataFrame that can be registered, I was wondering what people recommend in this case. Perhaps there's a simple way of registering an empty RDD as a temporary table in a PySpark SQL context that I'm overlooking. An alternative is to add special case logic in the client code to deal with an RDD backed by an empty table scan. But since the SQL will already handle that, I was hoping to avoid special case logic. Eric
adding a custom Scala RDD for use in PySpark
Hi, I'm new to Scala, Spark and PySpark and have a question about what approach to take in the problem I'm trying to solve. I have noticed that working with HBase tables read in using `newAPIHadoopRDD` can be quite slow with large data sets when one is interested in only a small subset of the keyspace. A prefix scan on the underlying HBase table in this case takes 11 seconds, while a filter applied to the full RDD returned by `newAPIHadoopRDD` takes 33 minutes. I looked around and found no way to specify a prefix scan from the Python interface. So I have written a Scala class that can be passed an argument, which then constructs a scan object, calls `newAPIHadoopRDD` from Scala with the scan object and feeds the resulting RDD back to Python. It took a few twists and turns to get this to work. A final challenge was the fact that `org.apache.spark.api.python.SerDeUtil` is private. This suggests to me that I'm doing something wrong, although I got it to work with sufficient hackery. What do people recommend for a general approach in getting PySpark RDDs from HBase prefix scans? I hope I have not missed something obvious. Eric