You could use the "withSessionDo" of the SparkCassandrConnector to preform
the simple insert:

CassandraConnector(conf).withSessionDo { session => session.execute(....) }

-Todd

On Tue, Feb 16, 2016 at 11:01 AM, Cody Koeninger <c...@koeninger.org> wrote:

> You could use sc.parallelize... but the offsets are already available at
> the driver, and they're a (hopefully) small enough amount of data that's
> it's probably more straightforward to just use the normal cassandra client
> to save them from the driver.
>
> On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> I have a kafka rdd and I need to save the offsets to cassandra table at
>> the begining of each batch.
>>
>> Basically I need to write the offsets of the type Offsets below that I am
>> getting inside foreachRD, to cassandra. The javafunctions api to write to
>> cassandra needs a rdd. How can I create a rdd from offsets and write to
>> cassandra table.
>>
>>
>> public static void writeOffsets(JavaPairDStream<String,
>> String> kafkastream){
>> kafkastream.foreachRDD((rdd,batchMilliSec) -> {
>> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
>> return null;
>> });
>>
>>
>> Thanks !!
>> Abhi
>>
>>
>>
>

Reply via email to