Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-09 Thread Aakash Basu
Hey Felix,

I've already tried with

.format("memory")
   .queryName("tableName")

but, still, it doesn't work for the second query. It just stalls the
program expecting new data for the first query.

Here's my code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")
query2 = df \
.writeStream \
.format("memory") \
.queryName("ABCD") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

wordCounts = spark.sql("Select col1, col2, col2/(select aver from
ABCD) col3 from ds")

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()


Here's my data -

1,2
3,4
5,6
7,8
9,10
11,12
13,14


What do you thing the problem maybe?


Thanks in adv,
Aakash.

On Fri, Apr 6, 2018 at 9:55 PM, Felix Cheung 
wrote:

> Instead of write to console you need to write to memory for it to be
> queryable
>
>
>  .format("memory")
>.queryName("tableName")
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#output-sinks
>
> --
> *From:* Aakash Basu 
> *Sent:* Friday, April 6, 2018 3:22:07 AM
> *To:* user
> *Subject:* Fwd: [Structured Streaming Query] Calculate Running Avg from
> Kafka feed using SQL query
>
> Any help?
>
> Need urgent help. Someone please clarify the doubt?
>
>
> -- Forwarded message --
> From: Aakash Basu 
> Date: Mon, Apr 2, 2018 at 1:01 PM
> Subject: [Structured Streaming Query] Calculate Running Avg from Kafka
> feed using SQL query
> To: user , "Bowden, Chris" <
> chris.bow...@microfocus.com>
>
>
> Hi,
>
> This is a very interesting requirement, where I am getting stuck at a few
> places.
>
> *Requirement* -
>
> Col1Col2
> 1  10
> 2  11
> 3  12
> 4  13
> 5  14
>
>
>
> *I have to calculate avg of col1 and then divide each row of col2 by that
> avg. And, the Avg should be updated with every new data being fed through
> Kafka into Spark Streaming. *
>
> *Avg(Col1) = Running Avg *
> *Col2 = Col2/Avg(Col1)*
>
>
> *Queries* *-*
>
>
> *1) I am currently trying to simply run a inner query inside a query and
> print Avg with other Col value and then later do the calculation. But,
> getting error.*
>
> Query -
>
> select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg 
> from transformed_Stream_DF t
>
> Error -
>
> pyspark.sql.utils.StreamingQueryException: u'Queries with streaming
> sources must be executed with writeStream.start();
>
> Even though, I already have writeStream.start(); in my code, it is
> probably throwing the error because of the inner select query (I think
> Spark is assuming it as another query altogether which require its own
> writeStream.start. Any help?
>
>
> *2) How to go about it? *I have another point in mind, i.e, querying the
> table to get the avg and store it in a variable. In the second query simply
> pass the variable and divide the second column to produce appropriate
> result. But, is it the right approach?
>
> *3) Final question*: How to do the calculation over the entire data and
> not the latest, do I need to keep appending somewhere and repeatedly use
> it? My average and all the rows of the Col2 shall change with every new
> incoming data.
>
>
> *Code -*
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
>
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
>
> ID.createOrReplaceTempView("transformed_Stream_DF")
> aggregate_func = spark.sql(
> "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) 
> as myAvg from transformed_Stream_DF t")  #  (Col2/(AVG(Col1)) as Col3)")
>
> # ---For Console Print---
>
> query = 

Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-06 Thread Felix Cheung
Instead of write to console you need to write to memory for it to be queryable


 .format("memory")
   .queryName("tableName")
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks


From: Aakash Basu 
Sent: Friday, April 6, 2018 3:22:07 AM
To: user
Subject: Fwd: [Structured Streaming Query] Calculate Running Avg from Kafka 
feed using SQL query

Any help?

Need urgent help. Someone please clarify the doubt?


-- Forwarded message --
From: Aakash Basu 
>
Date: Mon, Apr 2, 2018 at 1:01 PM
Subject: [Structured Streaming Query] Calculate Running Avg from Kafka feed 
using SQL query
To: user >, "Bowden, Chris" 
>


Hi,

This is a very interesting requirement, where I am getting stuck at a few 
places.

Requirement -

Col1Col2
1  10
2  11
3  12
4  13
5  14

I have to calculate avg of col1 and then divide each row of col2 by that avg. 
And, the Avg should be updated with every new data being fed through Kafka into 
Spark Streaming.

Avg(Col1) = Running Avg
Col2 = Col2/Avg(Col1)


Queries -


1) I am currently trying to simply run a inner query inside a query and print 
Avg with other Col value and then later do the calculation. But, getting error.

Query -

select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg 
from transformed_Stream_DF t

Error -

pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources 
must be executed with writeStream.start();

Even though, I already have writeStream.start(); in my code, it is probably 
throwing the error because of the inner select query (I think Spark is assuming 
it as another query altogether which require its own writeStream.start. Any 
help?


2) How to go about it? I have another point in mind, i.e, querying the table to 
get the avg and store it in a variable. In the second query simply pass the 
variable and divide the second column to produce appropriate result. But, is it 
the right approach?

3) Final question: How to do the calculation over the entire data and not the 
latest, do I need to keep appending somewhere and repeatedly use it? My average 
and all the rows of the Col2 shall change with every new incoming data.


Code -


from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")
aggregate_func = spark.sql(
"select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) 
as myAvg from transformed_Stream_DF t")  #  (Col2/(AVG(Col1)) as Col3)")

# ---For Console Print---

query = aggregate_func \
.writeStream \
.format("console") \
.start()
# .outputMode("complete") \
# ---Console Print ends---

query.awaitTermination()
# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py



Thanks,
Aakash.



Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-02 Thread Aakash Basu
Hi all,

The following is the updated code, where I'm getting the avg in a DF, but
the collect() function, to store the value as a variable and pass it to the
final select query is not working. So, avg is currently a dataframe and not
a variable with value stored in it.

New code -

from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")
avg = spark.sql("select AVG(Col1) as Avg from
transformed_Stream_DF")  # .collect()[0][0]

aggregate_func = spark.sql(
"select Col1, Col2, Col2/{0} as Col3 from
transformed_Stream_DF".format(avg))  # (Col2/(AVG(Col1)) as Col3)")

# ---For Console Print---
query1 = avg \
.writeStream \
.format("console") \
.outputMode("complete") \
.start()

query = aggregate_func \
.writeStream \
.format("console") \
.start()
# .outputMode("complete") \
# ---Console Print ends---

query1.awaitTermination()
# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py


If I uncomment the collect from the above code and use it, I get the
following error -

*pyspark.sql.utils.AnalysisException: u'Queries with streaming sources must
be executed with writeStream.start();;\nkafka'*

Any alternative (better) solution to get this job done, would suffice too.

Any help shall be greatly acknowledged.

Thanks,
Aakash.

On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu 
wrote:

> Hi,
>
> This is a very interesting requirement, where I am getting stuck at a few
> places.
>
> *Requirement* -
>
> Col1Col2
> 1  10
> 2  11
> 3  12
> 4  13
> 5  14
>
>
>
> *I have to calculate avg of col1 and then divide each row of col2 by that
> avg. And, the Avg should be updated with every new data being fed through
> Kafka into Spark Streaming.*
>
> *Avg(Col1) = Running Avg*
> *Col2 = Col2/Avg(Col1)*
>
>
> *Queries* *-*
>
>
> *1) I am currently trying to simply run a inner query inside a query and
> print Avg with other Col value and then later do the calculation. But,
> getting error.*
>
> Query -
>
> select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg 
> from transformed_Stream_DF t
>
> Error -
>
> pyspark.sql.utils.StreamingQueryException: u'Queries with streaming
> sources must be executed with writeStream.start();
>
> Even though, I already have writeStream.start(); in my code, it is
> probably throwing the error because of the inner select query (I think
> Spark is assuming it as another query altogether which require its own
> writeStream.start. Any help?
>
>
> *2) How to go about it? *I have another point in mind, i.e, querying the
> table to get the avg and store it in a variable. In the second query simply
> pass the variable and divide the second column to produce appropriate
> result. But, is it the right approach?
>
> *3) Final question*: How to do the calculation over the entire data and
> not the latest, do I need to keep appending somewhere and repeatedly use
> it? My average and all the rows of the Col2 shall change with every new
> incoming data.
>
>
> *Code -*
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
>
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
>
> ID.createOrReplaceTempView("transformed_Stream_DF")
> aggregate_func = spark.sql(
> "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) 
> as myAvg from transformed_Stream_DF t")  #  (Col2/(AVG(Col1)) as Col3)")
>
> # ---For Console Print---
>

Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-02 Thread Aakash Basu
Any help, guys?

On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu 
wrote:

> Hi,
>
> This is a very interesting requirement, where I am getting stuck at a few
> places.
>
> *Requirement* -
>
> Col1Col2
> 1  10
> 2  11
> 3  12
> 4  13
> 5  14
>
>
>
> *I have to calculate avg of col1 and then divide each row of col2 by that
> avg. And, the Avg should be updated with every new data being fed through
> Kafka into Spark Streaming.*
>
> *Avg(Col1) = Running Avg*
> *Col2 = Col2/Avg(Col1)*
>
>
> *Queries* *-*
>
>
> *1) I am currently trying to simply run a inner query inside a query and
> print Avg with other Col value and then later do the calculation. But,
> getting error.*
>
> Query -
>
> select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg 
> from transformed_Stream_DF t
>
> Error -
>
> pyspark.sql.utils.StreamingQueryException: u'Queries with streaming
> sources must be executed with writeStream.start();
>
> Even though, I already have writeStream.start(); in my code, it is
> probably throwing the error because of the inner select query (I think
> Spark is assuming it as another query altogether which require its own
> writeStream.start. Any help?
>
>
> *2) How to go about it? *I have another point in mind, i.e, querying the
> table to get the avg and store it in a variable. In the second query simply
> pass the variable and divide the second column to produce appropriate
> result. But, is it the right approach?
>
> *3) Final question*: How to do the calculation over the entire data and
> not the latest, do I need to keep appending somewhere and repeatedly use
> it? My average and all the rows of the Col2 shall change with every new
> incoming data.
>
>
> *Code -*
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
>
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
>
> ID.createOrReplaceTempView("transformed_Stream_DF")
> aggregate_func = spark.sql(
> "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) 
> as myAvg from transformed_Stream_DF t")  #  (Col2/(AVG(Col1)) as Col3)")
>
> # ---For Console Print---
>
> query = aggregate_func \
> .writeStream \
> .format("console") \
> .start()
> # .outputMode("complete") \
> # ---Console Print ends---
>
> query.awaitTermination()
> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>
>
>
>
> Thanks,
> Aakash.
>