I try spark-cassandra-connector. As our data from src table has ttl, and
saveToCassandra default does’t insert ttl .
fortunately we have a timestamp field indicate insert time. but
TTLOption.perRow should based on a column,
not support query this column then do calculation to setup ttl. so before
saveToCassandra,
I map to newly case class which had a ttl field(last one) so that can directory
used in TTLOption.perRow
case class Velocity(attribute: String,partner_code:String,
app_name: String,attr_type:String, timestamp: Long,ttl: Int)
def localTest(tbl : String): Unit = {
val velocitySrcTbl = sc.cassandraTable(“ks", "velocity").filter(row =
(row.getLong("timestamp"): java.lang.Long) != null)
val nowlong = System.currentTimeMillis()
val now = (nowlong/1000).toInt
val velocityRDD = velocitySrcTbl.map(row={
val ts = (row.getLong("timestamp")/1000).toInt
Velocity(
row.getString("attribute"),
row.getString("partner_code"),
row.getString("app_name"),
row.getString("type"),
row.getLong("timestamp"),
90*86400-(now-ts) //calculation ttl and directly used as parameter in
TTLOption.perRow()
)
})
velocityRDD.saveToCassandra("forseti", tbl,
SomeColumns("attribute", "partner_code", "app_name", "type" as "attr_type",
"timestamp"),
writeConf = WriteConf(ttl = TTLOption.perRow("ttl")))
}
But there are something wrong here:
WARN scheduler.TaskSetManager: Lost task 1.3 in stage 16.0 (TID 87,
192.168.6.53): java.lang.NullPointerException: Unexpected null value of column
5. Use get[Option[...]] to receive null values.
I alreay filter column5: timestamp filed not null. But why this exception
happen.
I also try use : getLongOption, but this exception still happen.
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
at first I want to ask issue on spark-case-connector project, but there are no
issues there, so I ask here.
Tks, qihuang.zheng
原始邮件
发件人:DuyHai [email protected]
收件人:[email protected]
发送时间:2015年10月22日(周四) 19:50
主题:Re: C* Table Changed and Data Migration with new primary key
Use Spark to distribute the job of copying data all over the cluster and help
accelerating the migration. The Spark connector does auto paging in the
background with the Java Driver
Le 22 oct. 2015 11:03, "qihuang.zheng" [email protected] a écrit :
I tried using java driver with auto paging query: setFetchSize instead of token
function. as Cass has this feature already.
ref from
here:http://www.datastax.com/dev/blog/client-side-improvements-in-cassandra-2-0
But I tried in test envrionment with only 1million data read then insert 3
tables, It’s too slow.
After running 20 min, Exception likeNoHostAvailableException happen, offcourse
data did’t sync completed.
And our product env has nearly 25 billion data. which is unacceptble for this
case. It’s there other ways?
Thanks Regards,
qihuang.zheng
原始邮件
发件人:Jeff [email protected]
收件人:[email protected][email protected]
发送时间:2015年10月22日(周四) 13:52
主题:Re: C* Table Changed and Data Migration with new primary key
Because the data format has changed, you’ll need to read it out and write it
back in again.
This means using either a driver (java, python, c++, etc), or something like
spark.
In either case, split up the token range so you can parallelize it for
significant speed improvements.
From: "qihuang.zheng"
Reply-To: "[email protected]"
Date: Wednesday, October 21, 2015 at 6:18 PM
To: user
Subject: C* Table Changed and Data Migration with new primary key
Hi All:
We have a table defined only one partition key and some cluster key.
CREATE TABLE test1 (
attribute text,
partner text,
app text,
"timestamp" bigint,
event text,
PRIMARY KEY ((attribute), partner, app, "timestamp")
)
And now we want to split original test1 table to 3 tables like this:
test_global : PRIMARY KEY ((attribute),“timestamp")
test_partner: PRIMARY KEY ((attribute, partner), "timestamp”)
test_app: PRIMARY KEY ((attribute, partner, app), “timestamp”)
Why we split original table because when queryglobal databy timestamp desc like
this:
select * from test1 where attribute=? order by timestamp desc
is not support in Cass. As class order by support should use all clustering key.
But sql like this:
select * from test1 where attribute=? order by partner desc,app desc, timestamp
desc
can’t query the right global data by ts desc.
After Split table we could do globa data query right: select * from test_global
where attribute=? order by timestamp desc.
Now we have a problem ofdata migration.
As I Know,sstableloaderis the most easy way,but could’t deal with different
table name. (Am I right?)
Andcpcmd in cqlsh can’t fit our situation because our data is two large.
(10Nodes, one nodes has 400G data)
I alos try JavaAPI by query the origin table and then insert into 3 different
splited table.But seems too slow
Any Solution aboult quick data migration?
TKS!!
PS: Cass version: 2.0.15
Thanks Regards,
qihuang.zheng