[Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query
Hi, This is a very interesting requirement, where I am getting stuck at a few places. *Requirement* - Col1Col2 1 10 2 11 3 12 4 13 5 14 *I have to calculate avg of col1 and then divide each row of col2 by that avg. And, the Avg should be updated with every new data being fed through Kafka into Spark Streaming.* *Avg(Col1) = Running Avg* *Col2 = Col2/Avg(Col1)* *Queries* *-* *1) I am currently trying to simply run a inner query inside a query and print Avg with other Col value and then later do the calculation. But, getting error.* Query - select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg from transformed_Stream_DF t Error - pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources must be executed with writeStream.start(); Even though, I already have writeStream.start(); in my code, it is probably throwing the error because of the inner select query (I think Spark is assuming it as another query altogether which require its own writeStream.start. Any help? *2) How to go about it? *I have another point in mind, i.e, querying the table to get the avg and store it in a variable. In the second query simply pass the variable and divide the second column to produce appropriate result. But, is it the right approach? *3) Final question*: How to do the calculation over the entire data and not the latest, do I need to keep appending somewhere and repeatedly use it? My average and all the rows of the Col2 shall change with every new incoming data. *Code -* from pyspark.sql import SparkSession import time from pyspark.sql.functions import split, col class test: spark = SparkSession.builder \ .appName("Stream_Col_Oper_Spark") \ .getOrCreate() data = spark.readStream.format("kafka") \ .option("startingOffsets", "latest") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "test1") \ .load() ID = data.select('value') \ .withColumn('value', data.value.cast("string")) \ .withColumn("Col1", split(col("value"), ",").getItem(0)) \ .withColumn("Col2", split(col("value"), ",").getItem(1)) \ .drop('value') ID.createOrReplaceTempView("transformed_Stream_DF") aggregate_func = spark.sql( "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg from transformed_Stream_DF t") # (Col2/(AVG(Col1)) as Col3)") # ---For Console Print--- query = aggregate_func \ .writeStream \ .format("console") \ .start() # .outputMode("complete") \ # ---Console Print ends--- query.awaitTermination() # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py Thanks, Aakash.
Re: OOM when extract big data from MySQL Using JDBC
Hi, Try increasing your driver memory (spark.driver.memory) as you have a 256GB RAM machine. As you are running your program in only 1 machine, the following solution "might" help. https://stackoverflow.com/a/35961952/5301646 Keep trying with different driver memory allocations. It might solve your problem. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Merge query using spark sql
I am using spark to run merge query in postgres sql. The way its being done now is save the data to be merged in postgres as temp tables. Now run the merge queries in postgres using java sql connection and statment . So basically this query runs in postgres. The queries are insert into source table if it doesn't exists in source but exists in temp table , else update. Problem is both the tables got 400K records and thus this whole query takes 20 hours to run. Is there any way to do it in spark itself and not run the query in PG , so this can complete in reasonable time. -- Thanks Deepak
Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query
Any help, guys? On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu wrote: > Hi, > > This is a very interesting requirement, where I am getting stuck at a few > places. > > *Requirement* - > > Col1Col2 > 1 10 > 2 11 > 3 12 > 4 13 > 5 14 > > > > *I have to calculate avg of col1 and then divide each row of col2 by that > avg. And, the Avg should be updated with every new data being fed through > Kafka into Spark Streaming.* > > *Avg(Col1) = Running Avg* > *Col2 = Col2/Avg(Col1)* > > > *Queries* *-* > > > *1) I am currently trying to simply run a inner query inside a query and > print Avg with other Col value and then later do the calculation. But, > getting error.* > > Query - > > select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg > from transformed_Stream_DF t > > Error - > > pyspark.sql.utils.StreamingQueryException: u'Queries with streaming > sources must be executed with writeStream.start(); > > Even though, I already have writeStream.start(); in my code, it is > probably throwing the error because of the inner select query (I think > Spark is assuming it as another query altogether which require its own > writeStream.start. Any help? > > > *2) How to go about it? *I have another point in mind, i.e, querying the > table to get the avg and store it in a variable. In the second query simply > pass the variable and divide the second column to produce appropriate > result. But, is it the right approach? > > *3) Final question*: How to do the calculation over the entire data and > not the latest, do I need to keep appending somewhere and repeatedly use > it? My average and all the rows of the Col2 shall change with every new > incoming data. > > > *Code -* > > from pyspark.sql import SparkSession > import time > from pyspark.sql.functions import split, col > > class test: > > > spark = SparkSession.builder \ > .appName("Stream_Col_Oper_Spark") \ > .getOrCreate() > > data = spark.readStream.format("kafka") \ > .option("startingOffsets", "latest") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "test1") \ > .load() > > ID = data.select('value') \ > .withColumn('value', data.value.cast("string")) \ > .withColumn("Col1", split(col("value"), ",").getItem(0)) \ > .withColumn("Col2", split(col("value"), ",").getItem(1)) \ > .drop('value') > > ID.createOrReplaceTempView("transformed_Stream_DF") > aggregate_func = spark.sql( > "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) > as myAvg from transformed_Stream_DF t") # (Col2/(AVG(Col1)) as Col3)") > > # ---For Console Print--- > > query = aggregate_func \ > .writeStream \ > .format("console") \ > .start() > # .outputMode("complete") \ > # ---Console Print ends--- > > query.awaitTermination() > # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit > --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 > /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py > > > > > Thanks, > Aakash. >
Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query
Hi all, The following is the updated code, where I'm getting the avg in a DF, but the collect() function, to store the value as a variable and pass it to the final select query is not working. So, avg is currently a dataframe and not a variable with value stored in it. New code - from pyspark.sql import SparkSession import time from pyspark.sql.functions import split, col class test: spark = SparkSession.builder \ .appName("Stream_Col_Oper_Spark") \ .getOrCreate() data = spark.readStream.format("kafka") \ .option("startingOffsets", "latest") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "test1") \ .load() ID = data.select('value') \ .withColumn('value', data.value.cast("string")) \ .withColumn("Col1", split(col("value"), ",").getItem(0)) \ .withColumn("Col2", split(col("value"), ",").getItem(1)) \ .drop('value') ID.createOrReplaceTempView("transformed_Stream_DF") avg = spark.sql("select AVG(Col1) as Avg from transformed_Stream_DF") # .collect()[0][0] aggregate_func = spark.sql( "select Col1, Col2, Col2/{0} as Col3 from transformed_Stream_DF".format(avg)) # (Col2/(AVG(Col1)) as Col3)") # ---For Console Print--- query1 = avg \ .writeStream \ .format("console") \ .outputMode("complete") \ .start() query = aggregate_func \ .writeStream \ .format("console") \ .start() # .outputMode("complete") \ # ---Console Print ends--- query1.awaitTermination() # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py If I uncomment the collect from the above code and use it, I get the following error - *pyspark.sql.utils.AnalysisException: u'Queries with streaming sources must be executed with writeStream.start();;\nkafka'* Any alternative (better) solution to get this job done, would suffice too. Any help shall be greatly acknowledged. Thanks, Aakash. On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu wrote: > Hi, > > This is a very interesting requirement, where I am getting stuck at a few > places. > > *Requirement* - > > Col1Col2 > 1 10 > 2 11 > 3 12 > 4 13 > 5 14 > > > > *I have to calculate avg of col1 and then divide each row of col2 by that > avg. And, the Avg should be updated with every new data being fed through > Kafka into Spark Streaming.* > > *Avg(Col1) = Running Avg* > *Col2 = Col2/Avg(Col1)* > > > *Queries* *-* > > > *1) I am currently trying to simply run a inner query inside a query and > print Avg with other Col value and then later do the calculation. But, > getting error.* > > Query - > > select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg > from transformed_Stream_DF t > > Error - > > pyspark.sql.utils.StreamingQueryException: u'Queries with streaming > sources must be executed with writeStream.start(); > > Even though, I already have writeStream.start(); in my code, it is > probably throwing the error because of the inner select query (I think > Spark is assuming it as another query altogether which require its own > writeStream.start. Any help? > > > *2) How to go about it? *I have another point in mind, i.e, querying the > table to get the avg and store it in a variable. In the second query simply > pass the variable and divide the second column to produce appropriate > result. But, is it the right approach? > > *3) Final question*: How to do the calculation over the entire data and > not the latest, do I need to keep appending somewhere and repeatedly use > it? My average and all the rows of the Col2 shall change with every new > incoming data. > > > *Code -* > > from pyspark.sql import SparkSession > import time > from pyspark.sql.functions import split, col > > class test: > > > spark = SparkSession.builder \ > .appName("Stream_Col_Oper_Spark") \ > .getOrCreate() > > data = spark.readStream.format("kafka") \ > .option("startingOffsets", "latest") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "test1") \ > .load() > > ID = data.select('value') \ > .withColumn('value', data.value.cast("string")) \ > .withColumn("Col1", split(col("value"), ",").getItem(0)) \ > .withColumn("Col2", split(col("value"), ",").getItem(1)) \ > .drop('value') > > ID.createOrReplaceTempView("transformed_Stream_DF") > aggregate_func = spark.sql( > "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) > as myAvg from transformed_Stream_DF t") # (Col2/(AVG(Col1)) as Col3)") > > # ---For Console Print--- > > query = aggregate_func
unsubscribe
unsubscribe
unsubscribe
unsubscribe
Re: is there a way of register python UDF using java API?
Looks like there is spark.udf().registerPython() like below. public void registerPython(java.lang.String name, org.apache.spark.sql.execution.python.UserDefinedPythonFunction udf) can anyone describe what *udfDeterministic *parameter does in the method signature below? public UserDefinedPythonFunction(java.lang.String name, org.apache.spark.api.python.PythonFunction func, org.apache.spark.sql.types.DataType dataType, int pythonEvalType, boolean udfDeterministic) { /* compiled code */ } On Sun, Apr 1, 2018 at 3:46 PM, kant kodali wrote: > Hi All, > > All of our spark code is in Java wondering if there a way to register > python UDF's using java API such that the registered UDF's can be used > using raw spark SQL. > If there is any other way to achieve this goal please suggest! > > Thanks > >
Re: is there a way of register python UDF using java API?
Hi Kant, The udfDeterministic would be set to false if the results from your UDF are non-deterministic, such as produced by random numbers, so the catalyst optimizer will not cache and reuse results. On Mon, Apr 2, 2018 at 12:11 PM, kant kodali wrote: > Looks like there is spark.udf().registerPython() like below. > > public void registerPython(java.lang.String name, org.apache.spark.sql. > execution.python.UserDefinedPythonFunction udf) > > > can anyone describe what *udfDeterministic *parameter does in the method > signature below? > > public UserDefinedPythonFunction(java.lang.String name, > org.apache.spark.api.python.PythonFunction func, > org.apache.spark.sql.types.DataType dataType, int pythonEvalType, boolean > udfDeterministic) { /* compiled code */ } > > > On Sun, Apr 1, 2018 at 3:46 PM, kant kodali wrote: > >> Hi All, >> >> All of our spark code is in Java wondering if there a way to register >> python UDF's using java API such that the registered UDF's can be used >> using raw spark SQL. >> If there is any other way to achieve this goal please suggest! >> >> Thanks >> >> >
Uncaught exception in thread heartbeat-receiver-event-loop-thread
Hi, I got an error of Uncaught exception in thread heartbeat-receiver-event-loop-thread. Does this error indicate that some node is too overloaded to be responsive? Thanks! ERROR Utils: Uncaught exception in thread heartbeat-receiver-event-loop-thread java.lang.NullPointerException at org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:464) at org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:439) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6$$anonfun$7.apply(TaskSchedulerImpl.scala:408) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6$$anonfun$7.apply(TaskSchedulerImpl.scala:408) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6.apply(TaskSchedulerImpl.scala:408) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6.apply(TaskSchedulerImpl.scala:407) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:407) at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1295) at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
unsubscribe
发自网易邮箱大师
How to delete empty columns in df when writing to parquet?
I am trying to read data from kafka and writing them in parquet format via Spark Streaming. The problem is, the data from kafka are in variable data structure. For example, app one has columns A,B,C, app two has columns B,C,D. So the data frame I read from kafka has all columns ABCD. When I decide to write the dataframe to parquet file partitioned with app name, the parquet file of app one also contains columns D, where the columns D is empty and it contains no data actually. So how to filter the empty columns when I writing dataframe to parquet? Thanks! Regard, Junfeng Chen
[Spark sql]: Re-execution of same operation takes less time than 1st
Hi, When we execute the same operation twice, spark takes less time ( ~40%) than the first. Our operation is like this: Read 150M rows ( spread in multiple parquet files) into DF Read 10M rows ( spread in multiple parquet files) into other DF. Do an intersect operation. Size of 150M row file: 587MB size of 10M file: 50M If first execution takes around 20 sec the next one will take just 10-12 sec. Any specific reason for this? Is any optimization is there that we can utilize during the first operation? Regards Sanjeev -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[Spark-sql]: DF parquet read write multiple tasks
Spark : 2.2 Number of cores : 128 ( all allocated to spark) Filesystem : Alluxio 1.6 Block size on alluxio: 32MB Input1 size : 586MB ( 150m records with only 1 column as int) Input2 size : 50MB ( 10m records with only 1 column as int) Input1 is spread across 20 parquet files. each file size is 29MB ( 1 alluxio block for each file) Input2 is also spread across 20 parquet files. Each file size is 2.2MB ( 1 alluxio block for each file) Operation : Read parquet as DF For Input1 : Number of tasks created is 120 For Input2 : number of tasks created is 20 How the number of tasks calculated for both? secondly, If i look at task Details UI I am seeing some tasks "Input size" as some xxx bytes while for some its in MB Further investigation shows me exactly 20 tasks Input size is around 29MB and rest 100 threads is some bytes. We are using parquet-cpp to generate parquet files and then reading those files in spark. We want to know how the tasks are generated around 120 ( it should be 20 )? Its blocking our core utilization Thanks Regards Sanjeev -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org