Re: KafkaProducer using Cassandra as source
Hi Kali, If you do not mind sending JSON, you could do something like this, using json4s: val rows = p.collect() map ( row => TestTable(row.getString(0), row.getString(1)) ) val json = parse(write(rows)) producer.send(new KeyedMessage[String, String]("trade", writePretty(json))) // or for each individual entry for( row <- rows) { producer.send(new KeyedMessage[String, String]("trade", writePretty(parse(write(row) } Just make sure you import the following: import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.json4s.native.Serialization import org.json4s.native.Serialization.{ read, write, writePretty } On Wed, Sep 23, 2015 at 12:26 PM, kali.tumm...@gmail.com < kali.tumm...@gmail.com> wrote: > Guys sorry I figured it out. > > val > > x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~") > > Full Code:- > > package com.examples > > /** > * Created by kalit_000 on 22/09/2015. > */ > > import kafka.producer.KeyedMessage > import kafka.producer.Producer > import kafka.producer.ProducerConfig > import java.util.Properties > import _root_.kafka.serializer.StringDecoder > import org.apache.spark._ > import org.apache.spark.SparkContext._ > import org.apache.spark.sql.SQLContext > import org.apache.spark.SparkConf > import org.apache.log4j.Logger > import org.apache.log4j.Level > import org.apache.spark.streaming._ > import org.apache.spark.streaming.{Seconds,StreamingContext} > import org.apache.spark._ > import org.apache.spark.streaming.StreamingContext._ > import org.apache.spark.streaming.kafka.KafkaUtils > > object SparkProducerDBCassandra { > > case class TestTable (TRADE_ID:String,TRADE_PRICE: String) > > def main(args: Array[String]): Unit = > { > Logger.getLogger("org").setLevel(Level.WARN) > Logger.getLogger("akka").setLevel(Level.WARN) > > val conf = new > > SparkConf().setMaster("local[2]").setAppName("testkali2").set("spark.cassandra.connection.host", > "127.0.0.1") > val sc=new SparkContext("local","test",conf) > //val ssc= new StreamingContext(sc,Seconds(2)) > > print("Test kali Spark Cassandra") > > val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc) > > val p=cc.sql("select * from people.person") > > p.collect().foreach(println) > > val props:Properties = new Properties() > props.put("metadata.broker.list", "localhost:9092") > props.put("serializer.class", "kafka.serializer.StringEncoder") > > val config= new ProducerConfig(props) > val producer= new Producer[String,String](config) > > val > > x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~") > >producer.send(new KeyedMessage[String, String]("trade", x)) > > //p.collect().foreach(print) > > //ssc.start() > > //ssc.awaitTermination() > > } > } > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp24774p24788.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: KafkaProducer using Cassandra as source
Guys sorry I figured it out. val x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~") Full Code:- package com.examples /** * Created by kalit_000 on 22/09/2015. */ import kafka.producer.KeyedMessage import kafka.producer.Producer import kafka.producer.ProducerConfig import java.util.Properties import _root_.kafka.serializer.StringDecoder import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming._ import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka.KafkaUtils object SparkProducerDBCassandra { case class TestTable (TRADE_ID:String,TRADE_PRICE: String) def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName("testkali2").set("spark.cassandra.connection.host", "127.0.0.1") val sc=new SparkContext("local","test",conf) //val ssc= new StreamingContext(sc,Seconds(2)) print("Test kali Spark Cassandra") val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc) val p=cc.sql("select * from people.person") p.collect().foreach(println) val props:Properties = new Properties() props.put("metadata.broker.list", "localhost:9092") props.put("serializer.class", "kafka.serializer.StringEncoder") val config= new ProducerConfig(props) val producer= new Producer[String,String](config) val x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~") producer.send(new KeyedMessage[String, String]("trade", x)) //p.collect().foreach(print) //ssc.start() //ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp24774p24788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
KafkaProducer using Cassandra as source
Hi All, I am new bee in spark. I managed to write up kafka prodcuder in spark where data comes from Cassandra table but I have few questions. Spark data output from Cassandra looks like below. [2,Joe,Smith] [1,Barack,Obama] I would like something like this in my kafka consumer, so need to remove [] at the beginning and end 2~Joe~Smith 1~Barack~Obama Also when I collect on rdd and add mkstring(",") two lines of data are getting combined as one (Ex:- [2,Joe,Smith][1,Barack,Obama]), so I used mkstring("\n") so now data looks like [2,Joe,Smith] [1,Barack,Obama] but I need something like this in my kafka consumer when I receive data any idea. 2~Joe~Smith 1~Barack~Obama Spark Code:- package com.examples /** * Created by kalit_000 on 22/09/2015. */ import kafka.producer.KeyedMessage import kafka.producer.Producer import kafka.producer.ProducerConfig import java.util.Properties import _root_.kafka.serializer.StringDecoder import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming._ import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka.KafkaUtils object SparkProducerDBCassandra { case class TestTable (TRADE_ID:String,TRADE_PRICE: String) def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName("testkali2").set("spark.cassandra.connection.host", "127.0.0.1") val sc=new SparkContext("local","test",conf) print("Test kali Spark Cassandra") val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc) val p=cc.sql("select * from people.person") p.collect().foreach(println) val props:Properties = new Properties() props.put("metadata.broker.list", "localhost:9092") props.put("serializer.class", "kafka.serializer.StringEncoder") val config= new ProducerConfig(props) val producer= new Producer[String,String](config) val x=p.collect().mkString("\n") producer.send(new KeyedMessage[String, String]("trade", x)) } } Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp24774.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org