Re: Performance tuning on the Databricks pyspark 2.4.4
For case 1, you can create 3 notebooks and 3 jobs in databricks. Then you can run them in parallel On Wed, 22 Jan 2020 at 3:50 am, anbutech wrote: > Hi sir, > > Could you please help me on the below two cases in the databricks pyspark > data processing terabytes of json data read from aws s3 bucket. > > case 1: > > currently I'm reading multiple tables sequentially to get the day count > from each table > > for ex: table_list.csv having one column with multiple table names > > year=2019 > month=12 > > tablesDF = > > spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv") > tabList = tablesDF.toPandas().values.tolist() > for table in tabList: > tab_name = table[0] > > // Snowflake Settings and snowflake table count() > > sfOptions = dict( > "sfURL" -> "", > "sfAccount" -> "", > "sfUser" -> "", > "sfPassword" -> "", > "sfDatabase" -> "", > "sfSchema" -> "", > "sfWarehouse" -> "", > ) > > // Read data as dataframe > > sfxdf = spark.read > .format("snowflake") > .options(**sfOptions) > .option("query", "select y as year,m as month,count(*) as sCount from > {} where y={} and m={} group by year,month").format(tab_name,year,month) > .load() > > //databricks delta lake > > dbxDF = spark.sql("select y as year,m as month,count(*) as dCount > from > db.{} where y={} and m={}" group by > year,month).format(tab_name,year,month) > > resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer' > ).na.fill(0).withColumn("flag_col", expr("dCount == sCount")) > > finalDF = resultDF.withColumn("table_name", > > lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col") > > > finalDF.coalesce(1).write.format('csv').option('header', > 'true').save("s3a://outputs/reportcsv) > > Question: > > 1) Instead of sequence based running the count query taking one by > one > tables ,how to parallel read all the tables from the csv file from s3 and > distributed the jobs across the cluster. > > 2) Could you please how to optimize the above code in the pyspark > for > parallel processing all the count query at the same time. > > > > Case 2 : > > Multiprocessing case: > > > Could you please help me how to achieve multiprocessing on the > above > pyspark query to parallel running in the distributed environment. > > By using below snippets is there any way to achieve the parallel > processing > pyspark code in the cluster. > > # Creating a pool of 20 processes. You can set this as per your > intended > parallelism and your available resources. > > > > >start = time.time() > pool = multiprocessing.Pool(20) > # This will execute get_counts() parallel, on each element inside > input_paths. > # result (a list of dictionary) is constructed when all executions are > completed. > //result = pool.map(get_counts, input_paths) > > end = time.time() > > result_df = pd.DataFrame(result) > # You can use, result_df.to_csv() to store the results in a csv. > print(result_df) > print('Time take : {}'.format(end - start)) > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha
Accumulator v2
Hello. We're currently using Spark streaming (Spark 2.3) for a number of applications. One pattern we've used successfully is to generate an accumulator inside a DStream transform statement. We then accumulate values associated with the RDD as we process the data. A stage completion listener that listens for stage complete events, retrieves the AccumulableInfo for our custom classes and exhausts the statistics to our back-end. We're trying to move more of our applications to using Structured Streaming. However, the accumulator pattern does not seem to obviously fit Structured Streaming. In many cases we're able to see basic statistics (e.g. # input and # output events) from the built-in statistics. We need to determine a pattern for more complex statistics (# errors, # of internal records, etc). Defining an accumulator on startup and adding statistics, we're able to see the statistics - but only updates - so if we read 10 records in the first trigger, and 15 in the second trigger we see accumulated values of 10, 25. There are several options that might allow us to move ahead: 1. We could have the AccumulableInfo contain previous counts and current counts 2. We could maintain current and previous counts separately 3. We could maintain a list of ID to AccumulatorV2 and then call accumulator.reset() once we've read data All of these options seem a little bit like a hacky workaround. Has anyone encountered this use-case? Is there a good pattern to follow? Regards, Bryan Jeffrey
Best approach to write UDF
Hi I have written spark udf and I am able to use them in spark scala / pyspark by using the org.apache.spark.sql.api.java.UDFx API. I d'like to use them in spark-sql thought thrift. I tried to create the functions "create function as 'org.my.MyUdf'". however I get the below error when using it: > org.apache.spark.sql.AnalysisException: No handler for UDF/UDAF/UDTF > 'org.my.MyUdf'; I have read there (https://stackoverflow.com/a/56970800/3865083) that only the org.apache.hadoop.hive.ql.exec.UDF API works for thrift. How one can write UDF the good way ? Thanks -- nicolas - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Performance tuning on the Databricks pyspark 2.4.4
Hi sir, Could you please help me on the below two cases in the databricks pyspark data processing terabytes of json data read from aws s3 bucket. case 1: currently I'm reading multiple tables sequentially to get the day count from each table for ex: table_list.csv having one column with multiple table names year=2019 month=12 tablesDF = spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv") tabList = tablesDF.toPandas().values.tolist() for table in tabList: tab_name = table[0] // Snowflake Settings and snowflake table count() sfOptions = dict( "sfURL" -> "", "sfAccount" -> "", "sfUser" -> "", "sfPassword" -> "", "sfDatabase" -> "", "sfSchema" -> "", "sfWarehouse" -> "", ) // Read data as dataframe sfxdf = spark.read .format("snowflake") .options(**sfOptions) .option("query", "select y as year,m as month,count(*) as sCount from {} where y={} and m={} group by year,month").format(tab_name,year,month) .load() //databricks delta lake dbxDF = spark.sql("select y as year,m as month,count(*) as dCount from db.{} where y={} and m={}" group by year,month).format(tab_name,year,month) resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer' ).na.fill(0).withColumn("flag_col", expr("dCount == sCount")) finalDF = resultDF.withColumn("table_name", lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col") finalDF.coalesce(1).write.format('csv').option('header', 'true').save("s3a://outputs/reportcsv) Question: 1) Instead of sequence based running the count query taking one by one tables ,how to parallel read all the tables from the csv file from s3 and distributed the jobs across the cluster. 2) Could you please how to optimize the above code in the pyspark for parallel processing all the count query at the same time. Case 2 : Multiprocessing case: Could you please help me how to achieve multiprocessing on the above pyspark query to parallel running in the distributed environment. By using below snippets is there any way to achieve the parallel processing pyspark code in the cluster. # Creating a pool of 20 processes. You can set this as per your intended parallelism and your available resources. start = time.time() pool = multiprocessing.Pool(20) # This will execute get_counts() parallel, on each element inside input_paths. # result (a list of dictionary) is constructed when all executions are completed. //result = pool.map(get_counts, input_paths) end = time.time() result_df = pd.DataFrame(result) # You can use, result_df.to_csv() to store the results in a csv. print(result_df) print('Time take : {}'.format(end - start)) -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Extract value from streaming Dataframe to a variable
Thanks for your reply. I'm using Spark 2.3.2. Looks like foreach operation is only supported for Java and Scala. Is there any alternative for Python? On Mon, Jan 20, 2020, 5:09 PM Jungtaek Lim wrote: > Hi, > > you can try out foreachBatch to apply the batch query operation to the > each output of micro-batch: > > http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch > > Hope this helps. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > > On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes wrote: > >> Streaming experts, any clues how to achieve this? >> >> After extracting few variables, I need to run them through a REST API for >> verification and decision making. >> >> Thanks for your help. >> >> Nick >> >> On Fri, Jan 17, 2020, 6:27 PM Nick Dawes wrote: >> >>> I need to extract a value from a PySpark structured streaming Dataframe >>> to a string variable to check something. >>> >>> I tried this code. >>> >>> agentName = >>> kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0] >>> >>> This works on a non-streaming Dataframe only. In a streaming Dataframe, >>> collect is not supported. >>> >>> Any workaround for this? >>> >>> Nick >>> >>> >>>
Call for presentations for ApacheCon North America 2020 now open
Dear Apache enthusiast, (You’re receiving this message because you are subscribed to one or more project mailing lists at the Apache Software Foundation.) The call for presentations for ApacheCon North America 2020 is now open at https://apachecon.com/acna2020/cfp ApacheCon will be held at the Sheraton, New Orleans, September 28th through October 2nd, 2020. As in past years, ApacheCon will feature tracks focusing on the various technologies within the Apache ecosystem, and so the call for presentations will ask you to select one of those tracks, or “General” if the content falls outside of one of our already-organized tracks. These tracks are: Karaf Internet of Things Fineract Community Content Delivery Solr/Lucene (Search) Gobblin/Big Data Integration Ignite Observability Cloudstack Geospatial Graph Camel/Integration Flagon Tomcat Cassandra Groovy Web/httpd General/Other The CFP will close Friday, May 1, 2020 8:00 AM (America/New_York time). Submit early, submit often, at https://apachecon.com/acna2020/cfp Rich, for the ApacheCon Planners - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Parallelism in custom Receiver
I custom a receiver that can process data from an external source. And I read the doc saying A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. The receivers are allocated to executors in a round robin fashion. https://spark.apache.org/docs/latest/streaming-programming-guide.html#important-points-to-remember So I should be able to launch multiple receiver. But my question is how to increase parallelism of Receiver? I do not see any parameter can be tuned according to doc - https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.receiver.Receiver val sc = new SparkConf().setMaster("local[*]").setAppName("MyAppName") val ssc = new StreamingContext(sc, Seconds(1)) val stream = ssc.receiverStream(new MyReceiver()) stream.print ssc.start Try(ssc.awaitTermination) match { case Success(_) => println("Finish streaming ") case Failure(ex) => println(s"exception : $ex") } Right now I use local, but I would like to learn both clustered mode and local mode strategy in launching multiple receiver for parallelism. Appreciate any suggestions!