Re: Performance tuning on the Databricks pyspark 2.4.4

2020-01-21 Thread ayan guha
For case 1, you can create 3 notebooks and 3 jobs in databricks. Then you
can run them in parallel

On Wed, 22 Jan 2020 at 3:50 am, anbutech  wrote:

> Hi sir,
>
> Could you please help me on the below two cases in the databricks pyspark
> data processing terabytes of json data read from aws s3 bucket.
>
> case 1:
>
> currently I'm reading multiple tables sequentially to get the day count
> from each table
>
> for ex: table_list.csv having one column with multiple table names
>
> year=2019
> month=12
>
> tablesDF =
>
> spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv")
> tabList = tablesDF.toPandas().values.tolist()
> for table in tabList:
> tab_name = table[0]
>
>  // Snowflake Settings and snowflake  table count()
>
> sfOptions = dict(
>   "sfURL" -> "",
>   "sfAccount" -> "",
>   "sfUser" -> "",
>   "sfPassword" -> "",
>   "sfDatabase" -> "",
>   "sfSchema" -> "",
>   "sfWarehouse" -> "",
> )
>
> // Read data as dataframe
>
> sfxdf = spark.read
>   .format("snowflake")
>   .options(**sfOptions)
>   .option("query", "select y as year,m as month,count(*) as sCount from
> {} where y={} and m={} group by year,month").format(tab_name,year,month)
>   .load()
>
> //databricks delta lake
>
>  dbxDF = spark.sql("select y as year,m as month,count(*) as dCount
> from
> db.{} where y={} and m={}" group by
> year,month).format(tab_name,year,month)
>
> resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer'
> ).na.fill(0).withColumn("flag_col", expr("dCount == sCount"))
>
> finalDF = resultDF.withColumn("table_name",
>
> lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col")
>
>
> finalDF.coalesce(1).write.format('csv').option('header',
> 'true').save("s3a://outputs/reportcsv)
>
> Question:
>
> 1) Instead of sequence based running the count query taking one by
> one
> tables ,how to parallel read all the tables from the csv file from s3 and
> distributed the jobs across the cluster.
>
> 2) Could you please how to optimize the above code in the pyspark
> for
> parallel processing all the count query at the same time.
>
>
>
> Case 2 :
>
> Multiprocessing case:
>   
>
> Could you please help me how to achieve multiprocessing on the
> above
> pyspark query to parallel running in the distributed environment.
>
> By using below snippets is there any way to achieve the parallel
> processing
> pyspark code in the cluster.
>
> # Creating a pool of 20 processes. You can set this as per your
> intended
> parallelism and your available resources.
>
>
>
>
>start = time.time()
> pool = multiprocessing.Pool(20)
> # This will execute get_counts() parallel, on each element inside
> input_paths.
> # result (a list of dictionary) is constructed when all executions are
> completed.
> //result = pool.map(get_counts, input_paths)
>
> end = time.time()
>
> result_df = pd.DataFrame(result)
> # You can use, result_df.to_csv() to store the results in a csv.
> print(result_df)
> print('Time take : {}'.format(end - start))
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Accumulator v2

2020-01-21 Thread Bryan Jeffrey
Hello.

We're currently using Spark streaming (Spark 2.3) for a number of
applications. One pattern we've used successfully is to generate an
accumulator inside a DStream transform statement.  We then accumulate
values associated with the RDD as we process the data.  A stage completion
listener that listens for stage complete events, retrieves the
AccumulableInfo for our custom classes and exhausts the statistics to our
back-end.

We're trying to move more of our applications to using Structured
Streaming.  However, the accumulator pattern does not seem to obviously fit
Structured Streaming.  In many cases we're able to see basic statistics
(e.g. # input and # output events) from the built-in statistics.  We need
to determine a pattern for more complex statistics (# errors, # of internal
records, etc).  Defining an accumulator on startup and adding statistics,
we're able to see the statistics - but only updates - so if we read 10
records in the first trigger, and 15 in the second trigger we see
accumulated values of 10, 25.

There are several options that might allow us to move ahead:
1. We could have the AccumulableInfo contain previous counts and current
counts
2. We could maintain current and previous counts separately
3. We could maintain a list of ID to AccumulatorV2 and then call
accumulator.reset() once we've read data

All of these options seem a little bit like a hacky workaround.  Has anyone
encountered this use-case?  Is there a good pattern to follow?

Regards,

Bryan Jeffrey


Best approach to write UDF

2020-01-21 Thread Nicolas Paris
Hi

I have written spark udf and I am able to use them in spark scala /
pyspark by using the org.apache.spark.sql.api.java.UDFx API.

I d'like to use them in spark-sql thought thrift. I tried to create the
functions "create function as 'org.my.MyUdf'". however I get the below
error when using it:

> org.apache.spark.sql.AnalysisException: No handler for UDF/UDAF/UDTF 
> 'org.my.MyUdf'; 

I have read there (https://stackoverflow.com/a/56970800/3865083) that
only the org.apache.hadoop.hive.ql.exec.UDF API works for thrift. 

How one can write UDF the good way ?

Thanks

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Performance tuning on the Databricks pyspark 2.4.4

2020-01-21 Thread anbutech
Hi sir,

Could you please help me on the below two cases in the databricks pyspark
data processing terabytes of json data read from aws s3 bucket.

case 1:

currently I'm reading multiple tables sequentially to get the day count 
from each table

for ex: table_list.csv having one column with multiple table names

year=2019
month=12

tablesDF =
spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv")
tabList = tablesDF.toPandas().values.tolist()
for table in tabList:
tab_name = table[0]

 // Snowflake Settings and snowflake  table count()
 
sfOptions = dict(
  "sfURL" -> "",
  "sfAccount" -> "",
  "sfUser" -> "",
  "sfPassword" -> "",
  "sfDatabase" -> "",
  "sfSchema" -> "",
  "sfWarehouse" -> "",
)

// Read data as dataframe

sfxdf = spark.read
  .format("snowflake")
  .options(**sfOptions)
  .option("query", "select y as year,m as month,count(*) as sCount from
{} where y={} and m={} group by year,month").format(tab_name,year,month)
  .load()
  
//databricks delta lake
  
 dbxDF = spark.sql("select y as year,m as month,count(*) as dCount from
db.{} where y={} and m={}" group by year,month).format(tab_name,year,month) 
  
resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer'
).na.fill(0).withColumn("flag_col", expr("dCount == sCount"))
  
finalDF = resultDF.withColumn("table_name",
lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col")


finalDF.coalesce(1).write.format('csv').option('header',
'true').save("s3a://outputs/reportcsv)

Question:

1) Instead of sequence based running the count query taking one by one
tables ,how to parallel read all the tables from the csv file from s3 and 
distributed the jobs across the cluster.

2) Could you please how to optimize the above code in the pyspark for
parallel processing all the count query at the same time.



Case 2 :

Multiprocessing case:
  

Could you please help me how to achieve multiprocessing on the above
pyspark query to parallel running in the distributed environment.

By using below snippets is there any way to achieve the parallel 
processing
pyspark code in the cluster.

# Creating a pool of 20 processes. You can set this as per your intended
parallelism and your available resources.




   start = time.time()
pool = multiprocessing.Pool(20)
# This will execute get_counts() parallel, on each element inside
input_paths.
# result (a list of dictionary) is constructed when all executions are
completed.
//result = pool.map(get_counts, input_paths)

end = time.time()

result_df = pd.DataFrame(result)
# You can use, result_df.to_csv() to store the results in a csv.
print(result_df)
print('Time take : {}'.format(end - start))



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Extract value from streaming Dataframe to a variable

2020-01-21 Thread Nick Dawes
Thanks for your reply.

I'm using Spark 2.3.2. Looks like foreach operation is only supported for
Java and Scala. Is there any alternative for Python?

On Mon, Jan 20, 2020, 5:09 PM Jungtaek Lim 
wrote:

> Hi,
>
> you can try out foreachBatch to apply the batch query operation to the
> each output of micro-batch:
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes  wrote:
>
>> Streaming experts, any clues how to achieve this?
>>
>> After extracting few variables, I need to run them through a REST API for
>> verification and decision making.
>>
>> Thanks for your help.
>>
>> Nick
>>
>> On Fri, Jan 17, 2020, 6:27 PM Nick Dawes  wrote:
>>
>>> I need to extract a value from a PySpark structured streaming Dataframe
>>> to a string variable to check something.
>>>
>>> I tried this code.
>>>
>>> agentName =
>>> kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]
>>>
>>> This works on a non-streaming Dataframe only. In a streaming Dataframe,
>>> collect is not supported.
>>>
>>> Any workaround for this?
>>>
>>> Nick
>>>
>>>
>>>


Call for presentations for ApacheCon North America 2020 now open

2020-01-21 Thread Rich Bowen

Dear Apache enthusiast,

(You’re receiving this message because you are subscribed to one or more 
project mailing lists at the Apache Software Foundation.)


The call for presentations for ApacheCon North America 2020 is now open 
at https://apachecon.com/acna2020/cfp


ApacheCon will be held at the Sheraton, New Orleans, September 28th 
through October 2nd, 2020.


As in past years, ApacheCon will feature tracks focusing on the various 
technologies within the Apache ecosystem, and so the call for 
presentations will ask you to select one of those tracks, or “General” 
if the content falls outside of one of our already-organized tracks. 
These tracks are:


Karaf
Internet of Things
Fineract
Community
Content Delivery
Solr/Lucene (Search)
Gobblin/Big Data Integration
Ignite
Observability
Cloudstack
Geospatial
Graph
Camel/Integration
Flagon
Tomcat
Cassandra
Groovy
Web/httpd
General/Other

The CFP will close Friday, May 1, 2020 8:00 AM (America/New_York time).

Submit early, submit often, at https://apachecon.com/acna2020/cfp

Rich, for the ApacheCon Planners

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Parallelism in custom Receiver

2020-01-21 Thread hamishberridge
I custom a receiver that can process data from an external source. And I read 
the doc saying 

    A DStream is associated with a single receiver. For attaining read 
parallelism multiple receivers i.e. multiple DStreams need to be created. A 
receiver is run within an executor. It occupies one core. Ensure that there are 
enough cores for processing after receiver slots are booked i.e. 
spark.cores.max should take the receiver slots into account. The receivers are 
allocated to executors in a round robin fashion.

https://spark.apache.org/docs/latest/streaming-programming-guide.html#important-points-to-remember

So I should be able to launch multiple receiver. But my question is how to 
increase parallelism of Receiver? I do not see any parameter can be tuned 
according to doc - 
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.receiver.Receiver

   val sc = new SparkConf().setMaster("local[*]").setAppName("MyAppName")
    val ssc = new StreamingContext(sc, Seconds(1))
    val stream = ssc.receiverStream(new MyReceiver())
    stream.print
    ssc.start
    Try(ssc.awaitTermination) match {
  case Success(_) => println("Finish streaming ")
  case Failure(ex) => println(s"exception : $ex")
    }

Right now I use local, but I would like to learn both clustered mode and local 
mode strategy in launching multiple receiver for parallelism. Appreciate any 
suggestions!