Thank you Helena,

But I would like to explain my problem space:


The output is supposed to be Cassandra. To achieve that, I have to use 
spark-cassandra-connecter APIs.

So going in a botton-up approach, to write to cassandra, I have to use:

javaFunctions(<JavaRDD object> rdd, 
TestTable.class).saveToCassandra("testkeyspace", "test_table");

To use the above function javaFunctions, I need to obtain the JavaRDD object, 
using the sc.parallelize() like this:

JavaRDD<TestTable> rdd = sc.parallelize(list);

The above sc.parallelize(list) accepts List object as a parameter.

The above List object will contain the data obtained either from JavaDStream or 
JavaPairReceiverDStream, that has the streaming data.


So the flow is:

I need:

1.  JavaDStream data to get mapped into List.

2.  The above List object to be passed to sc.parallelize(list) to obtain 
JavaRDD object.

3.  The above JavaRDD object to be passed to javaFunctions().saveToCassandra().


For all this I need a code that maps my JavaDStream data into List.

Once step 1 is done, steps 2 and 3 can easily be performed.


I need help for step 1.

I have written the code below to do it:

JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
JavaPairReceiverInputDStream<String, String> messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
System.out.println("Connection done!");

/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
Session session = connector.openSession();
session.execute("CREATE TABLE testkeyspace.test_table (key TEXT PRIMARY KEY, 
value TEXT)");
List<TestTable> list = new ArrayList<TestTable>();
messages.foreachRDD(new Function<Tuple2<String,String>, Void>()
{
    public Void call(Tuple2<String,String> message)
    {
String k = message._1();
String v = message._2();
TestTable tbl = new TestTable(k,v);
list.add(tbl);
return null;
    }
}
jssc.start();
jssc.awaitTermination(new Duration(60* 1000));


 It would be great help if a  way is suggested to map the JavaDStream to List.


Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841.
________________________________
From: Helena Edelson <helena.edel...@datastax.com>
Sent: Friday, December 5, 2014 9:12 PM
To: Sarosh, M.
Cc: user
Subject: Re: Spark-Streaming: output to cassandra

I think what you are looking for is something like:

JavaRDD<Double> pricesRDD = javaFunctions(sc).cassandraTable("ks", "tab", 
mapColumnTo(Double.class)).select("price");
JavaRDD<Person> rdd = javaFunctions(sc).cassandraTable("ks", "people", 
mapRowTo(Person.class));

noted here: 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md

?

- Helena
@helenaedelson

On Dec 5, 2014, at 10:15 AM, 
<m.sar...@accenture.com<mailto:m.sar...@accenture.com>> 
<m.sar...@accenture.com<mailto:m.sar...@accenture.com>> wrote:

Hi Akhil, Vyas, Helena,

Thank you for your suggestions.

As Akhil suggested earlier, i have implemented the batch Duration into 
JavaStreamingContext and waitForTermination(Duration).
The approach Helena suggested is Scala oriented.

But the issue now is that I want to set Cassandra as my output.
I have created a table in cassandra "test_table" with columns "key:text primary 
key" and "value:text"
I have mapped the data successfully into JavaDStream<Tuple2<String,String>> 
data :

JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
JavaPairReceiverInputDStream<String, String> messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< 
Tuple2<String,String>, Tuple2<String,String> >()
{
public Tuple2<String,String> call(Tuple2<String, String> message)
{
return new Tuple2<String,String>( message._1(), message._2() );
}
}
);

Then I have created a List:
List<TestTable> list = new ArrayList<TestTable>();
where TestTable is my custom class having the same structure as my Cassandra 
table, with members "key" and "value":
class TestTable
{
String key;
String val;
public TestTable() {}
public TestTable(String k, String v)
{
key=k;
val=v;
}
public String getKey(){
return key;
}
public void setKey(String k){
key=k;
}
public String getVal(){
return val;
}
public void setVal(String v){
val=v;
}
public String toString(){
return "Key:"+key+",Val:"+val;
}
}

Please suggest a way how to I add the data from 
JavaDStream<Tuple2<String,String>> data into the List<TestTable> list.
I am doing this so that I can subsequently use
JavaRDD<TestTable> rdd = sc.parallelize(list);
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", 
"test_table");
to save the RDD data into Cassandra.

I had tried coding this way:
 messages.foreachRDD(new Function<Tuple2<String,String>, String>()
{
public List<TestTable> call(Tuple2<String,String> message)
{
String k = message._1();
String v = message._2();
TestTable tbl = new TestTable(k,v);
list.put(tbl);
}
}
);
but seems some type mis-match happenning.
Please help.





Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841.
________________________________
From: Helena Edelson 
<helena.edel...@datastax.com<mailto:helena.edel...@datastax.com>>
Sent: Friday, December 5, 2014 6:26 PM
To: Sarosh, M.
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark-Streaming: output to cassandra

You can just do

You can just do something like this, the Spark Cassandra Connector handles the 
rest


KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, Map(KafkaTopicRaw -> 10), StorageLevel.DISK_ONLY_2)
  .map { case (_, line) => line.split(",")}
  .map(RawWeatherData(_))
  .saveToCassandra(CassandraKeyspace, CassandraTableRaw)

- Helena
@helenaedelson

On Dec 4, 2014, at 9:51 AM, 
m.sar...@accenture.com<mailto:m.sar...@accenture.com> wrote:


Hi,


I have written the code below which is streaming data from kafka, and printing 
to the console.

I want to extend this, and want my data to go into Cassandra table instead.


JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", 
new Duration(1000));
JavaPairReceiverInputDStream<String, String> messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

System.out.println("Connection done!");
JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, 
String>()
{
public String call(Tuple2<String, String> message)
{
return message._2();
}
}
);
//data.print();   --> output to console
data.foreachRDD(saveToCassandra("mykeyspace","mytable"));
jssc.start();
jssc.awaitTermination();



How should I implement the line:

data.foreachRDD(saveToCassandra("mykeyspace","mytable"));?

so that data goes into Cassandra, in each batch.  And how do I specify a batch, 
because if i do Ctrl+C on the console of streaming-job-jar, nothing will be 
entered into cassandra for sure since it is getting killed.


Please help.


Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841.

________________________________

This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
______________________________________________________________________________________

www.accenture.com<http://www.accenture.com/>


Reply via email to