Hi Jian, I found this link that could be useful. https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
By the way you can try once giving enough resources to run both jobs without defining the scheduler. I mean run the queries with default scheduler, but provide enough memory in the spark cluster to run both. Regards Amit Joshi On Sat, May 22, 2021 at 5:41 AM <jia...@xtronica.no> wrote: > Hi Amit; > > > > Thank you for your prompt reply and kind help. Wonder how to set the > scheduler to FAIR mode in python. Following code seems to me does not work > out. > > > > conf = SparkConf().setMaster("local").setAppName("HSMSTest1") > > sc = SparkContext(conf=conf) > > sc.setLocalProperty('spark.scheduler.mode', 'FAIR') > > spark = > SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate() > > > > by the way, as I am using nc -lk 9999 to input the stream, will it cause > by the reason as the input stream can only be consumed by one query as > mentioned in below post as; > > > > > https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming > > > > appreciate your further help/support. > > > > Best Regards, > > > > Jian Xu > > > > *From:* Amit Joshi <mailtojoshia...@gmail.com> > *Sent:* Friday, May 21, 2021 12:52 PM > *To:* jia...@xtronica.no > *Cc:* user@spark.apache.org > *Subject:* Re: multiple query with structured streaming in spark does not > work > > > > Hi Jian, > > > > You have to use same spark session to run all the queries. > > And use the following to wait for termination. > > > > q1 = writestream.start > > q2 = writstream2.start > > spark.streams.awaitAnyTermination > > > > And also set the scheduler in the spark config to FAIR scheduler. > > > > > > Regards > > Amit Joshi > > > > > > On Saturday, May 22, 2021, <jia...@xtronica.no> wrote: > > Hi There; > > > > I am new to spark. We are using spark to develop our app for data > streaming with sensor readings. > > > > I am having trouble to get two queries with structured streaming working > concurrently. > > > > Following is the code. It can only work with one of them. Wonder if there > is any way to get it doing. Appreciate help from the team. > > > > Regards, > > > > Jian Xu > > > > > > hostName = 'localhost' > > portNumber= 9999 > > wSize= '10 seconds' > > sSize ='2 seconds' > > > > def wnq_fb_func(batch_df, batch_id): > > print("batch is processed from time:{}".format(datetime.now())) > > print(batch_df.collect()) > > batch_df.show(10,False,False) > > > > lines = spark.readStream.format('socket').option('host', > hostName).option('port', portNumber).option('includeTimestamp', True).load() > > > > nSensors=3 > > > > scols = split(lines.value, ',').cast(ArrayType(FloatType())) > > sensorCols = [] > > for i in range(nSensors): > > sensorCols.append(scols.getItem(i).alias('sensor'+ str(i))) > > > > nlines=lines.select(lines.timestamp,lines.value, *sensorCols) > > nlines.printSchema() > > > > wnlines =nlines.select(window(nlines.timestamp, wSize, > sSize).alias('TimeWindow'), *lines.columns) > > wnquery= wnlines.writeStream.trigger(processingTime=sSize)\ > > .outputMode('append').foreachBatch(wnq_fb_func).start() > > > > nquery=nlines.writeStream.outputMode('append').format('console').start() > > nquery.awaitTermination() > > wnquery.awaitTermination() > > > > > > > >