Hi Gerard,
I've gone with option 1, and seems to be working well. Option 2 is also quite 
interesting. Thanks for your help in this.

Regards,
Ashic.

From: [email protected]
Date: Thu, 23 Oct 2014 17:07:56 +0200
Subject: Re: Spark Cassandra Connector proper usage
To: [email protected]
CC: [email protected]

Hi Ashic,
At the moment I see two options: 
1) You could  use the CassandraConnector object to execute your specialized 
query. The recommended pattern is to to that within a rdd.foreachPartition(...) 
in order to amortize DB connection setup over the number of elements in on 
partition.   Something like this:
val sparkContext = ???val cassandraConnector = CassandraConnector(conf) val 
dataRdd = ??? // I assume this is the source of dataval rddThingById = 
dataRdd.map(elem => transformToIdByThing(elem) ) 
rddThingById.foreachPartition(partition => {    
cassandraConnector.withSessionDo{ session =>       partition.foreach(record => 
session.execute("update foo set things = things + ? where id=? ", record.id, 
record.thing)   } }
2) You could change your datamodel slightly in order to avoid the update 
operation.
Actually, the cassandra representation of a set is nothing more than a column 
-> timestamp, where the column name is an element of the set.So Set (a,b,c) = 
Column(a)-> ts, Column(b) -> ts, Column(c) -> tx
So, if you desugarize your datamodel, you could use something like:create table 
foo (   id text primary key,   bar int,   things text,   ts timestamp,   
primary key ((id, bar), things))
And your Spark process would be reduced to:val sparkContext = ??? val dataRdd = 
??? // I assume this is the source of datadataRdd.map(elem => 
transformToIdBarThingByTimeStamp(elem) ).saveToCassandra(ks, foo,Columns(id, 
bar, thing, ts))

Hope this helps.
-kr, Gerard.

 



 
 
On Thu, Oct 23, 2014 at 2:48 PM, Ashic Mahtab <[email protected]> wrote:



Hi Gerard,
Thanks for the response. Here's the scenario:

The target cassandra schema looks like this:

create table foo (
   id text primary key,
   bar int,
   things set<text>
)

The source in question is a Sql Server source providing the necessary data. The 
source goes over the same "id" multiple times adding things to the "things" set 
each time. With inserts, it'll replace "things" with a new set of one element, 
instead of appending that item. As such, the query

update foo set things = things + ? where id=?

solves the problem. If I had to stick with saveToCassasndra, I'd have to read 
in the existing row for each row, and then write it back. Since this is 
happening in parallel on multiple machines, that would likely cause 
discrepancies where a node will read and update to older values. Hence my 
question about session management in order to issue custom update queries.

Thanks,
Ashic.

Date: Thu, 23 Oct 2014 14:27:47 +0200
Subject: Re: Spark Cassandra Connector proper usage
From: [email protected]
To: [email protected]

Ashic,

With the Spark-cassandra connector you would typically create an RDD from the 
source table, update what you need,  filter out what you don't update and write 
it back to Cassandra.
Kr, Gerard
On Oct 23, 2014 1:21 PM, "Ashic Mahtab" <[email protected]> wrote:



I'm looking to use spark for some ETL, which will mostly consist of "update" 
statements (a column is a set, that'll be appended to, so a simple insert is 
likely not going to work). As such, it seems like issuing CQL queries to import 
the data is the best option. Using the Spark Cassandra Connector, I see I can 
do this:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra
Now I don't want to open a session and close it for every row in the source (am 
I right in not wanting this? Usually, I have one session for the entire 
process, and keep using that in "normal" apps). However, it says that the 
connector is serializable, but the session is obviously not. So, wrapping the 
whole import inside a single "withSessionDo" seems like it'll cause problems. I 
was thinking of using something like this:
class CassandraStorage(conf:SparkConf) {
  val session = CassandraConnector(conf).openSession()
  def store (t:Thingy) : Unit = {
    //session.execute cql goes here
  }
}

Is this a good approach? Do I need to worry about closing the session? Where / 
how best would I do that? Any pointers are appreciated.
Thanks,Ashic.
                                          
                                          

                                          

Reply via email to