You could use the "withSessionDo" of the SparkCassandrConnector to preform the simple insert:
CassandraConnector(conf).withSessionDo { session => session.execute(....) } -Todd On Tue, Feb 16, 2016 at 11:01 AM, Cody Koeninger <c...@koeninger.org> wrote: > You could use sc.parallelize... but the offsets are already available at > the driver, and they're a (hopefully) small enough amount of data that's > it's probably more straightforward to just use the normal cassandra client > to save them from the driver. > > On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand <abhis.anan...@gmail.com> > wrote: > >> I have a kafka rdd and I need to save the offsets to cassandra table at >> the begining of each batch. >> >> Basically I need to write the offsets of the type Offsets below that I am >> getting inside foreachRD, to cassandra. The javafunctions api to write to >> cassandra needs a rdd. How can I create a rdd from offsets and write to >> cassandra table. >> >> >> public static void writeOffsets(JavaPairDStream<String, >> String> kafkastream){ >> kafkastream.foreachRDD((rdd,batchMilliSec) -> { >> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); >> return null; >> }); >> >> >> Thanks !! >> Abhi >> >> >> >