Re: multiple query with structured streaming in spark does not work

2021-05-21 Thread Amit Joshi
Hi Jian,

I found this link that could be useful.
https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application

By the way you can try once giving enough resources to run both jobs
without defining the scheduler.
I mean run the queries with default scheduler, but provide enough memory in
the spark cluster to run both.


Regards
Amit Joshi



On Sat, May 22, 2021 at 5:41 AM  wrote:

> Hi Amit;
>
>
>
> Thank you for your prompt reply and kind help. Wonder how to set the
> scheduler to FAIR mode in python. Following code seems to me does not work
> out.
>
>
>
> conf = SparkConf().setMaster("local").setAppName("HSMSTest1")
>
> sc = SparkContext(conf=conf)
>
> sc.setLocalProperty('spark.scheduler.mode', 'FAIR')
>
> spark =
> SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()
>
>
>
> by the way, as I am using nc -lk  to input the stream, will it cause
> by the reason as the input stream can only be consumed by one query as
> mentioned in below post as;
>
>
>
>
> https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming
>
>
>
> appreciate your further help/support.
>
>
>
> Best Regards,
>
>
>
> Jian Xu
>
>
>
> *From:* Amit Joshi 
> *Sent:* Friday, May 21, 2021 12:52 PM
> *To:* jia...@xtronica.no
> *Cc:* user@spark.apache.org
> *Subject:* Re: multiple query with structured streaming in spark does not
> work
>
>
>
> Hi Jian,
>
>
>
> You have to use same spark session to run all the queries.
>
> And use the following to wait for termination.
>
>
>
> q1 = writestream.start
>
> q2 = writstream2.start
>
> spark.streams.awaitAnyTermination
>
>
>
> And also set the scheduler in the spark config to FAIR scheduler.
>
>
>
>
>
> Regards
>
> Amit Joshi
>
>
>
>
>
> On Saturday, May 22, 2021,  wrote:
>
> Hi There;
>
>
>
> I am new to spark. We are using spark to develop our app for data
> streaming with sensor readings.
>
>
>
> I am having trouble to get two queries with structured streaming working
> concurrently.
>
>
>
> Following is the code. It can only work with one of them. Wonder if there
> is any way to get it doing. Appreciate help from the team.
>
>
>
> Regards,
>
>
>
> Jian Xu
>
>
>
>
>
> hostName = 'localhost'
>
> portNumber= 
>
> wSize= '10 seconds'
>
> sSize ='2 seconds'
>
>
>
> def wnq_fb_func(batch_df, batch_id):
>
> print("batch is processed from time:{}".format(datetime.now()))
>
> print(batch_df.collect())
>
> batch_df.show(10,False,False)
>
>
>
> lines = spark.readStream.format('socket').option('host',
> hostName).option('port', portNumber).option('includeTimestamp', True).load()
>
>
>
> nSensors=3
>
>
>
> scols = split(lines.value, ',').cast(ArrayType(FloatType()))
>
> sensorCols = []
>
> for i in range(nSensors):
>
> sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))
>
>
>
> nlines=lines.select(lines.timestamp,lines.value, *sensorCols)
>
> nlines.printSchema()
>
>
>
> wnlines =nlines.select(window(nlines.timestamp, wSize,
> sSize).alias('TimeWindow'), *lines.columns)
>
> wnquery= wnlines.writeStream.trigger(processingTime=sSize)\
>
> .outputMode('append').foreachBatch(wnq_fb_func).start()
>
>
>
> nquery=nlines.writeStream.outputMode('append').format('console').start()
>
> nquery.awaitTermination()
>
> wnquery.awaitTermination()
>
>
>
>
>
>
>
>


RE: multiple query with structured streaming in spark does not work

2021-05-21 Thread jianxu
Hi Amit;

 �

Further to my last email, I managed to set the scheduler to fair via code conf 
= 
SparkConf().setMaster("local").setAppName("HSMSTest1").set("spark.scheduler.mode",
 "FAIR")

 �

I can see the mode is changed in web view. Though the result is same. This does 
not work out. And it might be the reason as stated in the post. My question is 
how to use socket to carry multiple queries originated with same input 
streaming. Or it is not applicable with socket streaming mode at all.

 �

Regards,

 �

Jian Xu

 �

From: jia...@xtronica.no  
Sent: Friday, May 21, 2021 5:10 PM
To: 'Amit Joshi' 
Cc: user@spark.apache.org
Subject: RE: multiple query with structured streaming in spark does not work 

 �

Hi Amit;

 �

Thank you for your prompt reply and kind help. Wonder how to set the scheduler 
to FAIR mode in python. Following code seems to me does not work out.

 �

conf = SparkConf().setMaster("local").setAppName("HSMSTest1")

sc = SparkContext(conf=conf)

sc.setLocalProperty('spark.scheduler.mode', 'FAIR')

spark = SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()

 �

by the way, as I am using nc -lk  to input the stream, will it cause by the 
reason as the input stream can only be consumed by one query as mentioned in 
below post as;

 �

https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

 �

appreciate your further help/support.

 �

Best Regards,

 �

Jian Xu

 �

From: Amit Joshi mailto:mailtojoshia...@gmail.com> 
> 
Sent: Friday, May 21, 2021 12:52 PM
To: jia...@xtronica.no  
Cc: user@spark.apache.org  
Subject: Re: multiple query with structured streaming in spark does not work

 �

Hi Jian,

 �

You have to use same spark session to run all the queries.

And use the following to wait for termination.

 �

q1 = writestream.start

q2 = writstream2.start

spark.streams.awaitAnyTermination

 �

And also set the scheduler in the spark config to FAIR scheduler.

 �

 �

Regards

Amit Joshi

 �



On Saturday, May 22, 2021, mailto:jia...@xtronica.no> > 
wrote:

Hi There;

 �

I am new to spark. We are using spark to develop our app for data streaming 
with sensor readings. 

 �

I am having trouble to get two queries with structured streaming working 
concurrently.

 �

Following is the code. It can only work with one of them. Wonder if there is 
any way to get it doing. Appreciate help from the team.

 �

Regards,

 �

Jian Xu

 �

 �

hostName = 'localhost'

portNumber= 

wSize= '10 seconds' 

sSize ='2 seconds'

 �

def wnq_fb_func(batch_df, batch_id):

 � � � print("batch is processed from time:{}".format(datetime.now()))

 � � � print(batch_df.collect())

 � � � batch_df.show(10,False,False)

 � � � 

lines = spark.readStream.format('socket').option('host', 
hostName).option('port', portNumber).option('includeTimestamp', True).load()

 �

nSensors=3

 �

scols = split(lines.value, ',').cast(ArrayType(FloatType()))

sensorCols = []

for i in range(nSensors):

 � � � sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))

 � � � 

nlines=lines.select(lines.timestamp,lines.value, *sensorCols)

nlines.printSchema()

 �

wnlines =nlines.select(window(nlines.timestamp, wSize, 
sSize).alias('TimeWindow'), *lines.columns)

wnquery= wnlines.writeStream.trigger(processingTime=sSize)\

.outputMode('append').foreachBatch(wnq_fb_func).start()

 �

nquery=nlines.writeStream.outputMode('append').format('console').start()

nquery.awaitTermination()

wnquery.awaitTermination()

 �

 �

 �



RE: multiple query with structured streaming in spark does not work

2021-05-21 Thread jianxu
Hi Amit;

 �

Thank you for your prompt reply and kind help. Wonder how to set the scheduler 
to FAIR mode in python. Following code seems to me does not work out.

 �

conf = SparkConf().setMaster("local").setAppName("HSMSTest1")

sc = SparkContext(conf=conf)

sc.setLocalProperty('spark.scheduler.mode', 'FAIR')

spark = SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()

 �

by the way, as I am using nc -lk  to input the stream, will it cause by the 
reason as the input stream can only be consumed by one query as mentioned in 
below post as;

 �

https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

 �

appreciate your further help/support.

 �

Best Regards,

 �

Jian Xu

 �

From: Amit Joshi  
Sent: Friday, May 21, 2021 12:52 PM
To: jia...@xtronica.no
Cc: user@spark.apache.org
Subject: Re: multiple query with structured streaming in spark does not work

 �

Hi Jian,

 �

You have to use same spark session to run all the queries.

And use the following to wait for termination.

 �

q1 = writestream.start

q2 = writstream2.start

spark.streams.awaitAnyTermination

 �

And also set the scheduler in the spark config to FAIR scheduler.

 �

 �

Regards

Amit Joshi

 �



On Saturday, May 22, 2021, mailto:jia...@xtronica.no> > 
wrote:

Hi There;

 �

I am new to spark. We are using spark to develop our app for data streaming 
with sensor readings. 

 �

I am having trouble to get two queries with structured streaming working 
concurrently.

 �

Following is the code. It can only work with one of them. Wonder if there is 
any way to get it doing. Appreciate help from the team.

 �

Regards,

 �

Jian Xu

 �

 �

hostName = 'localhost'

portNumber= 

wSize= '10 seconds' 

sSize ='2 seconds'

 �

def wnq_fb_func(batch_df, batch_id):

 � � � print("batch is processed from time:{}".format(datetime.now()))

 � � � print(batch_df.collect())

 � � � batch_df.show(10,False,False)

 � � � 

lines = spark.readStream.format('socket').option('host', 
hostName).option('port', portNumber).option('includeTimestamp', True).load()

 �

nSensors=3

 �

scols = split(lines.value, ',').cast(ArrayType(FloatType()))

sensorCols = []

for i in range(nSensors):

 � � � sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))

 � � � 

nlines=lines.select(lines.timestamp,lines.value, *sensorCols)

nlines.printSchema()

 �

wnlines =nlines.select(window(nlines.timestamp, wSize, 
sSize).alias('TimeWindow'), *lines.columns)

wnquery= wnlines.writeStream.trigger(processingTime=sSize)\

.outputMode('append').foreachBatch(wnq_fb_func).start()

 �

nquery=nlines.writeStream.outputMode('append').format('console').start()

nquery.awaitTermination()

wnquery.awaitTermination()

 �

 �

 �



Re: multiple query with structured streaming in spark does not work

2021-05-21 Thread Amit Joshi
Hi Jian,

You have to use same spark session to run all the queries.
And use the following to wait for termination.

q1 = writestream.start
q2 = writstream2.start
spark.streams.awaitAnyTermination

And also set the scheduler in the spark config to FAIR scheduler.


Regards
Amit Joshi



On Saturday, May 22, 2021,  wrote:

> Hi There;
>
>
>
> I am new to spark. We are using spark to develop our app for data
> streaming with sensor readings.
>
>
>
> I am having trouble to get two queries with structured streaming working
> concurrently.
>
>
>
> Following is the code. It can only work with one of them. Wonder if there
> is any way to get it doing. Appreciate help from the team.
>
>
>
> Regards,
>
>
>
> Jian Xu
>
>
>
>
>
> hostName = 'localhost'
>
> portNumber= 
>
> wSize= '10 seconds'
>
> sSize ='2 seconds'
>
>
>
> def wnq_fb_func(batch_df, batch_id):
>
> print("batch is processed from time:{}".format(datetime.now()))
>
> print(batch_df.collect())
>
> batch_df.show(10,False,False)
>
>
>
> lines = spark.readStream.format('socket').option('host',
> hostName).option('port', portNumber).option('includeTimestamp',
> True).load()
>
>
>
> nSensors=3
>
>
>
> scols = split(lines.value, ',').cast(ArrayType(FloatType()))
>
> sensorCols = []
>
> for i in range(nSensors):
>
> sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))
>
>
>
> nlines=lines.select(lines.timestamp,lines.value, *sensorCols)
>
> nlines.printSchema()
>
>
>
> wnlines =nlines.select(window(nlines.timestamp, wSize,
> sSize).alias('TimeWindow'), *lines.columns)
>
> wnquery= wnlines.writeStream.trigger(processingTime=sSize)\
>
> .outputMode('append').foreachBatch(wnq_fb_func).start()
>
>
>
> nquery=nlines.writeStream.outputMode('append').format('console').start()
>
> nquery.awaitTermination()
>
> wnquery.awaitTermination()
>
>
>
>
>
>
>


multiple query with structured streaming in spark does not work

2021-05-21 Thread jianxu
Hi There;

 �

I am new to spark. We are using spark to develop our app for data streaming 
with sensor readings. 

 �

I am having trouble to get two queries with structured streaming working 
concurrently.

 �

Following is the code. It can only work with one of them. Wonder if there is 
any way to get it doing. Appreciate help from the team.

 �

Regards,

 �

Jian Xu

 �

 �

hostName = 'localhost'

portNumber= 

wSize= '10 seconds' 

sSize ='2 seconds'

 �

def wnq_fb_func(batch_df, batch_id):

print("batch is processed from time:{}".format(datetime.now()))

print(batch_df.collect())

batch_df.show(10,False,False)



lines = spark.readStream.format('socket').option('host', 
hostName).option('port', portNumber).option('includeTimestamp', True).load()

 �

nSensors=3

 �

scols = split(lines.value, ',').cast(ArrayType(FloatType()))

sensorCols = []

for i in range(nSensors):

sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))



nlines=lines.select(lines.timestamp,lines.value, *sensorCols)

nlines.printSchema()

 �

wnlines =nlines.select(window(nlines.timestamp, wSize, 
sSize).alias('TimeWindow'), *lines.columns)

wnquery= wnlines.writeStream.trigger(processingTime=sSize)\

.outputMode('append').foreachBatch(wnq_fb_func).start()

 �

nquery=nlines.writeStream.outputMode('append').format('console').start()

nquery.awaitTermination()

wnquery.awaitTermination()

 �

 �

 �



Re: Calculate average from Spark stream

2021-05-21 Thread Mich Talebzadeh
OK where is your watermark created? That is the  one that works out the
average temperature!

   # construct a streaming dataframe streamingDataFrame that
subscribes to topic temperature
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
.option("subscribe", "temperature") \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


resultC = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \   ## you
do not need this
   , col("parsed_value.timestamp").alias("timestamp") \
   , col("parsed_value.temperature").alias("temperature"))

 """
We work out the window and the AVG(temperature) in the window's
timeframe below
This should return back the following Dataframe as struct

 root
 |-- window: struct (nullable = false)
 ||-- start: timestamp (nullable = true)
 ||-- end: timestamp (nullable = true)
 |-- avg(temperature): double (nullable = true)
"""

   resultM = resultC. \
 withWatermark("timestamp", "5 minutes"). \
 groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
 avg('temperature')

   # We take the above Dataframe resultM and flatten it to get the
columns aliased as "startOfWindowFrame", "endOfWindowFrame" and "
AVGTemperature"
resultMF = resultM. \
   select( \

F.col("window.start").alias("startOfWindowFrame") \
  , F.col("window.end").alias("endOfWindowFrame") \
  ,
F.col("avg(temperature)").alias("AVGTemperature"))

resultMF.printSchema()

   result = resultMF. \
 writeStream. \
 outputMode('complete'). \
 option("numRows", 1000). \
 option("truncate", "false"). \
 format('console'). \
 option('checkpointLocation', checkpoint_path). \
 queryName("temperature"). \
 start()

except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

  result.awaitTermination()

This should work and return back average values for temperature between
start and end

Sample output

root
 |-- startOfWindowFrame: timestamp (nullable = true)
 |-- endOfWindowFrame: timestamp (nullable = true)
 |-- AVGTemperature: double (nullable = true)

---
Batch: 15
---
+---+---+--+
|startOfWindowFrame |endOfWindowFrame   |AVGTemperature|
+---+---+--+
|2021-05-17 19:35:00|2021-05-17 19:40:00|24.8  |
|2021-05-17 19:45:00|2021-05-17 19:50:00|27.0  |
|2021-05-17 20:25:00|2021-05-17 20:30:00|24.4  |
|2021-05-17 20:20:00|2021-05-17 20:25:00|25.4  |
|2021-05-17 19:25:00|2021-05-17 19:30:00|24.25 |
|2021-05-17 19:55:00|2021-05-17 20:00:00|23.5  |
|2021-05-21 15:30:00|2021-05-21 15:35:00|23.0  |
|2021-05-17 19:50:00|2021-05-17 19:55:00|25.0  |
|2021-05-17 20:30:00|2021-05-17 20:35:00|25.8  |
|2021-05-17 20:10:00|2021-05-17 20:15:00|25.25 |
|2021-05-17 19:30:00|2021-05-17 19:35:00|27.0  |
|2021-05-17 20:15:00|2021-05-17 20:20:00|23.8  |
|2021-05-17 20:00:00|2021-05-17 20:05:00|24.668|
|2021-05-17 19:40:00|2021-05-17 19:45:00|25.5  |
|2021-05-17 20:05:00|2021-05-17 20:10:00|26.4  |
+---+---+--+


HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or 

Re: DF blank value fill

2021-05-21 Thread ayan guha
Hi

You can do something like this:

SELECT MainKey, Subkey,
  case when val1 is null then newval1 else val1 end val1,
  case when val2 is null then newval2 else val1 end val2,
  case when val3 is null then newval3 else val1 end val3
 from (select mainkey,subkey, val1,val2, val3,
 first_value() over (partitionby mainkey, subkey order
by val1 nulls last) newval1,
 first_value() over (partitionby mainkey, subkey order
by val2 nulls last) newval2,
 first_value() over (partitionby mainkey, subkey order
by val3 nulls last) newval3
from table) x

On Fri, May 21, 2021 at 9:29 PM Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> Hi all,
>
>
>
> My df looks like follows:
>
>
>
> Situation:
>
> MainKey, SubKey, Val1, Val2, Val3, …
>
> 1, 2, a, null, c
>
> 1, 2, null, null, c
>
> 1, 3, null, b, null
>
> 1, 3, a, null, c
>
>
>
>
>
> Desired outcome:
>
> 1, 2, a, b, c
>
> 1, 2, a, b, c
>
> 1, 3, a, b, c
>
> 1, 3, a, b, c
>
>
>
>
>
> How could I populate/synchronize empty cells of all records with the same
> combination of MainKey and SubKey with the respective value of other rows
> with the same key combination?
>
> A certain value, if not null, of a col is guaranteed to be unique within
> the df. If a col exists then there is at least one row with a not-null
> value.
>
>
>
> I am using pyspark.
>
>
>
> Thanks for any hint,
>
> Best
>
> Meikel
>


-- 
Best Regards,
Ayan Guha


Re: [External Sender] Memory issues in 3.0.2 but works well on 2.4.4

2021-05-21 Thread Femi Anthony
Post the stack trace and provide some more details about your configuration

On Fri, May 21, 2021 at 7:52 AM Praneeth Shishtla 
wrote:

> Hi,
> I have a simple DecisionForest model and was able to train the model on
> pyspark==2.4.4 without any issues.
> However, when I upgraded to pyspark==3.0.2, the fit takes a lot of time and
> eventually errors out saying out of memory. Even tried reducing the number
> of samples for training but no luck.
> Can anyone help with this?
>
> Best,
> Praneeth
>
>
>
>
> --
> Sent from:
> https://urldefense.com/v3/__http://apache-spark-user-list.1001560.n3.nabble.com/__;!!FrPt2g6CO4Wadw!ZLSi87cn_z9rooOV1ocSdGFNI6MpiLu5Ldf1WhLlFxPu4CyEzHNMNWj5iLGceHHPFMbr89MM$
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Card Machine Learning (ML) Team, Capital One

__



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Memory issues in 3.0.2 but works well on 2.4.4

2021-05-21 Thread Praneeth Shishtla
Hi,
I have a simple DecisionForest model and was able to train the model on
pyspark==2.4.4 without any issues.
However, when I upgraded to pyspark==3.0.2, the fit takes a lot of time and
eventually errors out saying out of memory. Even tried reducing the number
of samples for training but no luck.
Can anyone help with this?

Best,
Praneeth




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



DF blank value fill

2021-05-21 Thread Bode, Meikel, NMA-CFD
Hi all,

My df looks like follows:

Situation:
MainKey, SubKey, Val1, Val2, Val3, ...
1, 2, a, null, c
1, 2, null, null, c
1, 3, null, b, null
1, 3, a, null, c


Desired outcome:
1, 2, a, b, c
1, 2, a, b, c
1, 3, a, b, c
1, 3, a, b, c


How could I populate/synchronize empty cells of all records with the same 
combination of MainKey and SubKey with the respective value of other rows with 
the same key combination?
A certain value, if not null, of a col is guaranteed to be unique within the 
df. If a col exists then there is at least one row with a not-null value.

I am using pyspark.

Thanks for any hint,
Best
Meikel