Any help, guys? On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu <aakash.spark....@gmail.com> wrote:
> Hi, > > This is a very interesting requirement, where I am getting stuck at a few > places. > > *Requirement* - > > Col1 Col2 > 1 10 > 2 11 > 3 12 > 4 13 > 5 14 > > > > *I have to calculate avg of col1 and then divide each row of col2 by that > avg. And, the Avg should be updated with every new data being fed through > Kafka into Spark Streaming.* > > *Avg(Col1) = Running Avg* > *Col2 = Col2/Avg(Col1)* > > > *Queries* *-* > > > *1) I am currently trying to simply run a inner query inside a query and > print Avg with other Col value and then later do the calculation. But, > getting error.* > > Query - > > select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg > from transformed_Stream_DF t > > Error - > > pyspark.sql.utils.StreamingQueryException: u'Queries with streaming > sources must be executed with writeStream.start(); > > Even though, I already have writeStream.start(); in my code, it is > probably throwing the error because of the inner select query (I think > Spark is assuming it as another query altogether which require its own > writeStream.start. Any help? > > > *2) How to go about it? *I have another point in mind, i.e, querying the > table to get the avg and store it in a variable. In the second query simply > pass the variable and divide the second column to produce appropriate > result. But, is it the right approach? > > *3) Final question*: How to do the calculation over the entire data and > not the latest, do I need to keep appending somewhere and repeatedly use > it? My average and all the rows of the Col2 shall change with every new > incoming data. > > > *Code -* > > from pyspark.sql import SparkSession > import time > from pyspark.sql.functions import split, col > > class test: > > > spark = SparkSession.builder \ > .appName("Stream_Col_Oper_Spark") \ > .getOrCreate() > > data = spark.readStream.format("kafka") \ > .option("startingOffsets", "latest") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "test1") \ > .load() > > ID = data.select('value') \ > .withColumn('value', data.value.cast("string")) \ > .withColumn("Col1", split(col("value"), ",").getItem(0)) \ > .withColumn("Col2", split(col("value"), ",").getItem(1)) \ > .drop('value') > > ID.createOrReplaceTempView("transformed_Stream_DF") > aggregate_func = spark.sql( > "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) > as myAvg from transformed_Stream_DF t") # (Col2/(AVG(Col1)) as Col3)") > > # -----------For Console Print----------- > > query = aggregate_func \ > .writeStream \ > .format("console") \ > .start() > # .outputMode("complete") \ > # -----------Console Print ends----------- > > query.awaitTermination() > # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit > --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 > /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py > > > > > Thanks, > Aakash. >