Hi Ashic,
At the moment I see two options:
1) You could use the CassandraConnector object to execute your specialized
query. The recommended pattern is to to that within a
rdd.foreachPartition(...) in order to amortize DB connection setup over the
number of elements in on partition. Something like this:
val sparkContext = ???
val cassandraConnector = CassandraConnector(conf)
val dataRdd = ??? // I assume this is the source of data
val rddThingById = dataRdd.map(elem => transformToIdByThing(elem) )
rddThingById.foreachPartition(partition => {
cassandraConnector.withSessionDo{ session =>
partition.foreach(record => session.execute("update foo set things =
things + ? where id=? ", record.id, record.thing)
}
}
2) You could change your datamodel slightly in order to avoid the update
operation.
Actually, the cassandra representation of a set is nothing more than a
column -> timestamp, where the column name is an element of the set.
So Set (a,b,c) = Column(a)-> ts, Column(b) -> ts, Column(c) -> tx
So, if you desugarize your datamodel, you could use something like:
create table foo (
id text primary key,
bar int,
things text,
ts timestamp,
primary key ((id, bar), things)
)
And your Spark process would be reduced to:
val sparkContext = ???
val dataRdd = ??? // I assume this is the source of data
dataRdd.map(elem => transformToIdBarThingByTimeStamp(elem)
).saveToCassandra(ks, foo,Columns(id, bar, thing, ts))
Hope this helps.
-kr, Gerard.
On Thu, Oct 23, 2014 at 2:48 PM, Ashic Mahtab <[email protected]> wrote:
> Hi Gerard,
> Thanks for the response. Here's the scenario:
>
> The target cassandra schema looks like this:
>
> create table foo (
> id text primary key,
> bar int,
> things set<text>
> )
>
> The source in question is a Sql Server source providing the necessary
> data. The source goes over the same "id" multiple times adding things to
> the "things" set each time. With inserts, it'll replace "things" with a new
> set of one element, instead of appending that item. As such, the query
>
> update foo set things = things + ? where id=?
>
> solves the problem. If I had to stick with saveToCassasndra, I'd have to
> read in the existing row for each row, and then write it back. Since this
> is happening in parallel on multiple machines, that would likely cause
> discrepancies where a node will read and update to older values. Hence my
> question about session management in order to issue custom update queries.
>
> Thanks,
> Ashic.
>
> ------------------------------
> Date: Thu, 23 Oct 2014 14:27:47 +0200
> Subject: Re: Spark Cassandra Connector proper usage
> From: [email protected]
> To: [email protected]
>
>
> Ashic,
> With the Spark-cassandra connector you would typically create an RDD from
> the source table, update what you need, filter out what you don't update
> and write it back to Cassandra.
>
> Kr, Gerard
> On Oct 23, 2014 1:21 PM, "Ashic Mahtab" <[email protected]> wrote:
>
> I'm looking to use spark for some ETL, which will mostly consist of
> "update" statements (a column is a set, that'll be appended to, so a simple
> insert is likely not going to work). As such, it seems like issuing CQL
> queries to import the data is the best option. Using the Spark Cassandra
> Connector, I see I can do this:
>
>
>
> <https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra>
> <https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra
>
>
>
> <https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra>
>
> Now I don't want to open a session and close it for every row in the
> source (am I right in not wanting this? Usually, I have one session for the
> entire process, and keep using that in "normal" apps). However, it says
> that the connector is serializable, but the session is obviously not. So,
> wrapping the whole import inside a single "withSessionDo" seems like it'll
> cause problems. I was thinking of using something like this:
>
>
> class CassandraStorage(conf:SparkConf) {
> val session = CassandraConnector(conf).openSession()
> def store (t:Thingy) : Unit = {
> //session.execute cql goes here
> }
> }
>
>
> Is this a good approach? Do I need to worry about closing the session?
> Where / how best would I do that? Any pointers are appreciated.
>
>
> Thanks,
>
> Ashic.
>
>