BTW, in Spark 1.4 announced today, I added SQLContext.getOrCreate. So you dont need to create the singleton yourself.
On Wed, Jun 10, 2015 at 3:21 AM, Sergio Jiménez Barrio < drarse.a...@gmail.com> wrote: > Note: CCing user@spark.apache.org > > > First, you must check if the RDD is empty: > > messages.foreachRDD { rdd => > if (!rdd.isEmpty) { ....}} > > Now, you can obtain the instance of a SQLContext: > > val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) > > > > > *Optional* > In this moment, I like work with DataFrame. I convert RDD to DataFrame. I > see that you recive a JSON: > > val df :DataFrame = sqlContext.jsonRDD(message, > getSchema(getSchemaStr)).toDF() > > > My getSchema function create a Schema of my JSON: > > def getSchemaStr() :String = "feature1 feature2 ..." > > def getSchema(schema: String) :StructType = StructType (schema.split(" > ").map(fieldName => StructField(fieldName, StringType, true))) > > I hope you helps. > > Regards. > > > > 2015-06-09 17:36 GMT+02:00 codingforfun [via Apache Spark User List] < > ml-node+s1001560n23226...@n3.nabble.com>: > >> I don't know why, you said “Why? I tried this solution and works fine.” >> means your SQLContext instance alive all the streaming application’s life >> time, rather than one bath duration ? My code as below: >> >> >> object SQLContextSingleton extends java.io.Serializable{ >> @transient private var instance: SQLContext = null >> >> // Instantiate SQLContext on demand >> def getInstance(sparkContext: SparkContext): SQLContext = synchronized { >> if (instance == null) { >> instance = new SQLContext(sparkContext) >> } >> instance >> } >> } >> >> // type_->typex, id_->id, url_->url >> case class dddd (time: Timestamp, id: Int, openfrom: Int, tab: Int) extends >> Serializable >> case class Count(x: Int) >> >> @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) >> ssc.checkpoint(".") >> >> val kafkaParams = Map("metadata.broker.list" -> "10.20.30.40:9092,") >> @transient val dstream = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name")) >> @transient val dddstream= newsIdDStream.map(x => x._2).flatMap(x => >> x.split("\n")) >> >> dddstream.foreachRDD { rdd => >> >> SQLContextSingleton.getInstance(rdd.sparkContext).jsonRDD(rdd).registerTempTable("ttable") >> val ret = SQLContextSingleton.getInstance(rdd.sparkContext).sql("SELECT >> COUNT(*) FROM ttable") >> ret.foreach{ x => println(x(0)) } >> } >> >> ssc.start() >> ssc.awaitTermination() >> >> >> >> >> >> >> 在 2015-06-09 17:41:44,"drarse [via Apache Spark User List]" <[hidden >> email] <http:///user/SendEmail.jtp?type=node&node=23226&i=0>> 写道: >> >> Why? I tried this solution and works fine. >> >> El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] >> <[hidden >> email] <http:///user/SendEmail.jtp?type=node&node=23219&i=0>> escribió: >> >>> Hi drarse, thanks for replying, the way you said use a singleton object >>> does not work >>> >>> >>> >>> >>> 在 2015-06-09 16:24:25,"drarse [via Apache Spark User List]" <[hidden >>> email] <http:///user/SendEmail.jtp?type=node&node=23218&i=0>> 写道: >>> >>> The best way is create a singleton object like: >>> >>> object SQLContextSingleton { >>>> @transient private var instance: SQLContext = null >>>> >>>> // Instantiate SQLContext on demand >>>> def getInstance(sparkContext: SparkContext): SQLContext = synchronized { >>>> if (instance == null) { >>>> instance = new SQLContext(sparkContext) >>>> } >>>> instance >>>> }} >>>> >>>> You have more information in the programming guide: >>> >>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations >>> >>> >>> >>> 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] <[hidden >>> email] <http:///user/SendEmail.jtp?type=node&node=23216&i=0>>: >>> >>>> I used SQLContext in a spark streaming application as blew: >>>> >>>> ---------------------------------------------------------------------------------------------------------------- >>>> >>>> case class topic_name (f1: Int, f2: Int) >>>> >>>> val sqlContext = new SQLContext(sc) >>>> @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) >>>> ssc.checkpoint(".") >>>> val theDStream = KafkaUtils.createDirectStream[String, String, >>>> StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name")) >>>> >>>> theDStream.map(x => x._2).foreach { rdd => >>>> sqlContext.jsonRDD(newsIdRDD).registerTempTable("topic_name") >>>> sqlContext.sql("select count(*) from topic_name").foreach { x => >>>> WriteToFile("file_path", x(0).toString) >>>> } >>>> } >>>> >>>> ssc.start() >>>> ssc.awaitTermination() >>>> ---------------------------------------------------------------------------------------------------------------- >>>> >>>> >>>> I found i could only get every 5 seconds's count of message, because >>>> "The lifetime of this temporary table is tied to the SQLContext that was >>>> used to create this DataFrame", i guess every 5 seconds, a new sqlContext >>>> will be create and the temporary table can only alive just 5 seconds, i >>>> want to the sqlContext and the temporary table alive all the streaming >>>> application's life cycle, how to do it? >>>> >>>> Thanks~ >>>> >>>> ------------------------------ >>>> If you reply to this email, your message will be added to the >>>> discussion below: >>>> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215.html >>>> To start a new topic under Apache Spark User List, email [hidden >>>> email] <http:///user/SendEmail.jtp?type=node&node=23216&i=1> >>>> To unsubscribe from Apache Spark User List, click here. >>>> NAML >>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>> >>> >>> >>> >>> ------------------------------ >>> If you reply to this email, your message will be added to the >>> discussion below: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23216.html >>> To unsubscribe from How to keep a SQLContext instance alive in a spark >>> streaming application's life cycle?, click here. >>> NAML >>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>> >>> >>> >>> >>> >>> ------------------------------ >>> If you reply to this email, your message will be added to the >>> discussion below: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23218.html >>> To start a new topic under Apache Spark User List, email <a >>> href="javascript:_e(%7B%7D,'cvml','ml-node%[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=23226&i=1>');" >>> target="_blank">ml-node+s1001560n1h49@... >>> To unsubscribe from Apache Spark User List, click here. >>> NAML >>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>> >> >> >> -- >> Atte. Sergio Jiménez >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the >> discussion below: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23219.html >> To unsubscribe from How to keep a SQLContext instance alive in a spark >> streaming application's life cycle?, click here. >> NAML >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> >> >> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the >> discussion below: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23226.html >> To start a new topic under Apache Spark User List, email >> ml-node+s1001560n1...@n3.nabble.com >> To unsubscribe from Apache Spark User List, click here >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=ZHJhcnNlLmFtZXNAZ21haWwuY29tfDF8MTUyMzY0MjQyMA==> >> . >> NAML >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > >