Any help? Need urgent help. Someone please clarify the doubt?
---------- Forwarded message ---------- From: Aakash Basu <aakash.spark....@gmail.com> Date: Thu, Apr 5, 2018 at 2:50 PM Subject: Spark Structured Streaming Inner Queries fails To: user <user@spark.apache.org> Hi, Why are inner queries not allowed in Spark Streaming? Spark assumes the inner query to be a separate stream altogether and expects it to be triggered with a separate writeStream.start(). Why so? Error: pyspark.sql.utils.StreamingQueryException: 'Queries with streaming sources must be executed with writeStream.start();;\ntextSocket\n=== Streaming Query ===\nIdentifier: [id = f77611ee-ce1c-4b16-8812-0f1afe05562c, runId = 0bb4d880-1a4d-4a6c-8fe0-2b4977ab52d0]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {TextSocketSource[host: localhost, port: 9998]: 5}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [col1#3, col2#4, (cast(col2#4 as double) / scalar-subquery#8 []) AS col3#9]\n: +- Aggregate [avg(cast(col1#3 as double)) AS aver#7]\n: +- SubqueryAlias ds\n: +- Project [split(value#1, ,)[0] AS col1#3, split(value#1, ,)[1] AS col2#4]\n: +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3ac605bf, socket,List(),None,List(),None,Map(header -> true, host -> localhost, path -> csv, port -> 9998),None), textSocket, [value#1]\n+- SubqueryAlias ds\n +- Project [split(value#1, ,)[0] AS col1#3, split(value#1, ,)[1] AS col2#4]\n +- StreamingExecutionRelation TextSocketSource[host: localhost, port: 9998], [value#1]\n' from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split spark = SparkSession \ .builder \ .appName("StructuredRunningAvg") \ .getOrCreate() data = spark \ .readStream \ .format("socket") \ .option("header","true") \ .option("host", "localhost") \ .option("port", 9998) \ .load("csv") id = data.select(split(data.value, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2")) id.createOrReplaceTempView("ds") final_DF = spark.sql("Select col1, col2, col2/(select avg(col1) as aver from ds) col3 from ds") query = final_DF \ .writeStream \ .format("console") \ .start() query.awaitTermination() Thanks, Aakash.