Any help?

Need urgent help. Someone please clarify the doubt?

---------- Forwarded message ----------
From: Aakash Basu <aakash.spark....@gmail.com>
Date: Thu, Apr 5, 2018 at 2:50 PM
Subject: Spark Structured Streaming Inner Queries fails
To: user <user@spark.apache.org>


Hi,

Why are inner queries not allowed in Spark Streaming? Spark assumes the
inner query to be a separate stream altogether and expects it to be
triggered with a separate writeStream.start().

Why so?

Error: pyspark.sql.utils.StreamingQueryException: 'Queries with streaming
sources must be executed with writeStream.start();;\ntextSocket\n===
Streaming Query ===\nIdentifier: [id = f77611ee-ce1c-4b16-8812-0f1afe05562c,
runId = 0bb4d880-1a4d-4a6c-8fe0-2b4977ab52d0]\nCurrent Committed Offsets:
{}\nCurrent Available Offsets: {TextSocketSource[host: localhost, port:
9998]: 5}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical
Plan:\nProject [col1#3, col2#4, (cast(col2#4 as double) / scalar-subquery#8
[]) AS col3#9]\n:  +- Aggregate [avg(cast(col1#3 as double)) AS
aver#7]\n:     +- SubqueryAlias ds\n:        +- Project [split(value#1,
,)[0] AS col1#3, split(value#1, ,)[1] AS col2#4]\n:           +-
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3ac605bf,
socket,List(),None,List(),None,Map(header -> true, host -> localhost, path
-> csv, port -> 9998),None), textSocket, [value#1]\n+- SubqueryAlias ds\n
+- Project [split(value#1, ,)[0] AS col1#3, split(value#1, ,)[1] AS
col2#4]\n      +- StreamingExecutionRelation TextSocketSource[host:
localhost, port: 9998], [value#1]\n'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession \
    .builder \
    .appName("StructuredRunningAvg") \
    .getOrCreate()

data = spark \
    .readStream \
    .format("socket") \
    .option("header","true") \
    .option("host", "localhost") \
    .option("port", 9998) \
    .load("csv")

id = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))


id.createOrReplaceTempView("ds")

final_DF = spark.sql("Select col1, col2, col2/(select avg(col1) as
aver from ds) col3 from ds")

query = final_DF \
    .writeStream \
    .format("console") \
    .start()

query.awaitTermination()



Thanks,
Aakash.

Reply via email to