Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/4956#discussion_r26271367
--- Diff: docs/streaming-programming-guide.md ---
@@ -1327,6 +1420,178 @@ Note that the connections in the pool should be
lazily created on demand and tim
***
+## DataFrame and SQL Operations
+You can easily use [DataFrames and SQL](sql-programming-guide.html)
operations on streaming data. You have to create a SQLContext using the
SparkContext that the StreamingContext is using. Furthermore this has to done
such that it can be restarted on driver failures. This is done by creating a
lazily instantiated singleton instance of SQLContext. This is shown in the
following example. It modifies the earlier [word count
example](#a-quick-example) to generate word counts using DataFrames and SQL.
Each RDD is converted to a DataFrame, registered as a temporary table and then
queried it using SQL.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+/** Lazily instantiated singleton instance of SQLContext */
+object SQLContextSingleton {
+ @transient private var instance: SQLContext = null
+
+ // Instantiate SQLContext on demand
+ def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
+ if (instance == null) {
+ instance = new SQLContext(sparkContext)
+ }
+ instance
+ }
+}
+
+...
+
+/** Case class for converting RDD to DataFrame */
+case class Row(word: String)
+
+...
+
+/** DataFrame operations inside your streaming program */
+
+val words: DStream[String] = ...
+
+words.foreachRDD { rdd =>
+
+ // Get the singleton instance of SQLContext
+ val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
+ import sqlContext.implicits._
+
+ // Convert RDD[String] to RDD[case class] to DataFrame
+ val wordsDataFrame = rdd.map(w => Row(w)).toDF()
+
+ // Register as table
+ wordsDataFrame.registerTempTable("words")
+
+ // Do word count on DataFrame using SQL and print it
+ val wordCountsDataFrame =
+ sqlContext.sql("select word, count(*) as total from words group by
word")
+ wordCountsDataFrame.show()
+}
+
+{% endhighlight %}
+
+See the full [source
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala).
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+/** Lazily instantiated singleton instance of SQLContext */
+class JavaSQLContextSingleton {
+ static private transient SQLContext instance = null;
+ static public SQLContext getInstance(SparkContext sparkContext) {
+ if (instance == null) {
+ instance = new SQLContext(sparkContext);
+ }
+ return instance;
+ }
+}
+
+...
+
+/** Java Bean class for converting RDD to DataFrame */
+public class JavaRow implements java.io.Serializable {
+ private String word;
+
+ public String getWord() {
+ return word;
+ }
+
+ public void setWord(String word) {
+ this.word = word;
+ }
+}
+
+...
+
+/** DataFrame operations inside your streaming program */
+
+JavaDStream<String> words = ...
+
+words.foreachRDD(
+ new Function2<JavaRDD<String>, Time, Void>() {
+ @Override
+ public Void call(JavaRDD<String> rdd, Time time) {
+ SQLContext sqlContext =
JavaSQLContextSingleton.getInstance(rdd.context());
+
+ // Convert RDD[String] to RDD[case class] to DataFrame
+ JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
+ public JavaRow call(String word) {
+ JavaRow record = new JavaRow();
+ record.setWord(word);
+ return record;
+ }
+ });
+ DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD,
JavaRow.class);
+
+ // Register as table
+ wordsDataFrame.registerTempTable("words");
+
+ // Do word count on table using SQL and print it
+ DataFrame wordCountsDataFrame =
+ sqlContext.sql("select word, count(*) as total from words group
by word");
+ wordCountsDataFrame.show();
+ return null;
+ }
+ }
+);
+{% endhighlight %}
+
+See the full [source
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java).
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+# Lazily instantiated global instance of SQLContext
+def getSqlContextInstance(sparkContext):
+ if ('sqlContextSingletonInstance' not in globals()):
+ globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
+ return globals()['sqlContextSingletonInstance']
+
+...
+
+# DataFrame operations inside your streaming program
+
+words = ... # DStream of strings
+
+def process(time, rdd):
+ print "========= %s =========" % str(time)
+ try:
+ # Get the singleton instance of SQLContext
+ sqlContext = getSqlContextInstance(rdd.context)
+
+ # Convert RDD[String] to RDD[Row] to DataFrame
+ rowRdd = rdd.map(lambda w: Row(word=w))
+ wordsDataFrame = sqlContext.createDataFrame(rowRdd)
+
+ # Register as table
+ wordsDataFrame.registerTempTable("words")
+
+ # Do word count on table using SQL and print it
+ wordCountsDataFrame = sqlContext.sql("select word, count(*) as
total from words group by word")
+ wordCountsDataFrame.show()
+ except:
+ pass
+
+words.foreachRDD(process)
+{% endhighlight %}
+
+See the full [source
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/sql_network_wordcount.py).
+
+</div>
+</div>
+
+You can also run SQL queries on tables defined on streaming data from a
different thread (that is, asynchronous to the running StreamingContext). Just
make sure that you set the StreamingContext to remember sufficient amount of
streaming data such that query can run. Otherwise the StreamingContext, which
is unaware of the any asynchronous SQL queries, will delete off old streaming
data before the query can complete. For example, if you want to query the last
batch, but your query can take 5 minutes to run, then call
`streamingContext.remember(Minutes(5))` (in Scala, or equivalent in other
languages).
--- End diff --
The only reason I wanted to bring that up is for people are doing it, they
should do remember(). I am not sure how to clarify this further.
TD
On Wed, Mar 11, 2015 at 3:32 PM, Josh Rosen <[email protected]>
wrote:
> In docs/streaming-programming-guide.md
> <https://github.com/apache/spark/pull/4956#discussion_r26262942>:
>
> > +
> > + # Do word count on table using SQL and print it
> > + wordCountsDataFrame = sqlContext.sql("select word, count(*) as
total from words group by word")
> > + wordCountsDataFrame.show()
> > + except:
> > + pass
> > +
> > +words.foreachRDD(process)
> > +{% endhighlight %}
> > +
> > +See the full [source
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/sql_network_wordcount.py).
> > +
> > +</div>
> > +</div>
> > +
> > +You can also run SQL queries on tables defined on streaming data from
a different thread (that is, asynchronous to the running StreamingContext).
Just make sure that you set the StreamingContext to remember sufficient amount
of streaming data such that query can run. Otherwise the StreamingContext,
which is unaware of the any asynchronous SQL queries, will delete off old
streaming data before the query can complete. For example, if you want to query
the last batch, but your query can take 5 minutes to run, then call
`streamingContext.remember(Minutes(5))` (in Scala, or equivalent in other
languages).
>
> Is it necessary to bring up the "separate thread" thing here? It seems a
> bit confusing to me because it's already the user's choice of where / when
> to do the blocking on StreamingContext.awaitTermination() since the call
> to StreamingContext.start() isn't blocking.
>
> I guess my confusion is that users can't directly run things in the
> StreamingContext's thread and the blocking is taking place in the user's
> calling thread anyways.
>
> â
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/4956/files#r26262942>.
>
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]