Re: RESTful Operations

2020-01-19 Thread Chris Teoh
Maybe something like Livy, otherwise roll your own REST API and have it
start a Spark job.

On Mon, 20 Jan 2020 at 06:55,  wrote:

> I am new to Spark. The task I want to accomplish is let client send http
> requests, then spark process that request for further operations. However
> searching Spark's website docs
>
>
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package
> 
>
> https://spark.apache.org/docs/latest/
>
> I do not find any places mentioning about this. Also most of the internet
> result are more lated to spark job server.
>
> Any places I should start if I want to use Spark for such purpose?
>
> Thanks
>


-- 
Chris


RESTful Operations

2020-01-19 Thread hamishberridge
I am new to Spark. The task I want to accomplish is let client send http 
requests, then spark process that request for further operations. However  
searching Spark's website docs 

    
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package

    https://spark.apache.org/docs/latest/

I do not find any places mentioning about this. Also most of the internet 
result are more lated to spark job server. 

Any places I should start if I want to use Spark for such purpose? 

Thanks 


Re: Record count query parallel processing in databricks spark delta lake

2020-01-19 Thread Farhan Misarwala
Hi Anbutech,

If I am not mistaken, I believe you are trying to read multiple
dataframes from around 150 different paths (in your case the Kafka
topics) to count their records. You have all these paths stored in a
CSV with columns year, month, day and hour.

Here is what I came up with; I have been using this approach for
similar kind of problems, and it has worked for me in the past, in
almost all of my cases. This should give you an idea.

Thanks.


import multiprocessing
import time

import pandas as pd
from pyspark.sql import SparkSession

if __name__ == '__main__':
# Suppose you have hourly data for 1st three hours of the year, since 2018.
intervals = [
{"year": 2018, "month": 1, "day": 1, "hour": 1},
{"year": 2018, "month": 1, "day": 1, "hour": 2},
{"year": 2018, "month": 1, "day": 1, "hour": 3},
{"year": 2019, "month": 1, "day": 1, "hour": 1},
{"year": 2019, "month": 1, "day": 1, "hour": 2},
{"year": 2019, "month": 1, "day": 1, "hour": 3},
{"year": 2020, "month": 1, "day": 1, "hour": 1},
{"year": 2020, "month": 1, "day": 1, "hour": 2},
{"year": 2020, "month": 1, "day": 1, "hour": 3}
]

# A Dataframe from my list of dictionaries, you can use
pd.read_csv to read your `topics_list.csv` into a Dataframe
input_paths_df = pd.DataFrame(list(intervals))
print(input_paths_df)

# In your case, reading your csv file (now a df) into a list of dictionaries
input_paths = input_paths_df.to_dict('records')

# Since this on my local, in PyCharm <3
spark = SparkSession.builder.master("local[*]").appName('Reading
topics in parallel').getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 10)
spark.conf.set("spark.executor.pyspark.memory", "2G")


def get_counts(record):
year_ = record['year']
month_ = record['month']
day_ = record['day']
hour_ = record['hour']
path = "s3a://kafka-bucket_name/topic_name/{}/{}/{}/{}".format(year_,
month_, day_, hour_)
df = spark.read.json(path)
# No need of createOrReplaceTempView, just do a df.count()
record['count'] = df.count()
# time.sleep(2)
record['filePath'] = path

return record


# Creating a pool of 20 processes. You can set this as per your
intended parallelism and your available resources.
start = time.time()
pool = multiprocessing.Pool(20)
# This will execute get_counts() parallelly, on each element
inside input_paths.
# result (a list of dictionary) is constructed when all executions
are completed.
result = pool.map(get_counts, input_paths)
end = time.time()

result_df = pd.DataFrame(result)
# You can use, result_df.to_csv() to store the results in a csv.
print(result_df)
print('Time take : {}'.format(end - start))


On Fri, Jan 17, 2020 at 11:49 PM anbutech  wrote:

> Hi,
>
> I have a question on the design of monitoring pyspark script on the large
> number of source json data coming from more than 100 kafka topics.
> These multiple topics are store under separate bucket in aws s3.each of the
> kafka topics having more Terabytes of json data with respect to the
> partition year,month,day,hour data.
> each hour having lot of json files with .gz compression format.
>
> What is the best way to process more terabytes of data read from s3 under
> partition year,month,day,hour for all the topics source.
>
> we are using databricks delta lake in databricks platform.query is taking
> lot of time to get the count of records by year,month,date wise.
>
> what is the best approach to handle terabytes of data to get the record
> counts for all the days.
>
> please help me on the below problem:
>
> topics_list.csv
> --
> I'm planning to put all the 150 topics in the csv file to read and process
> the data to get day record count.
>
> I have to iterate sequence one by one topics from csv file using for loop
> or
> other options,to pass the year,month,date arguments
> to get the record count for the particular day for all the topics.
>
> df
> =spark.read.json("s3a://kafka-bucket_name/topic_name/year/month/day/hour/")
>
> df.createOrReplaceTempView(topic1_source)
>
> spark.sql("select count(1) from topic1_source")
>
> Could you help me or give an good  approach to parallely run the query for
> all the topics to get the record day count for all the 150 topics
> effectively using apache spark delta lake in databricks.
>
> thanks
>
>
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Does explode lead to more usage of memory

2020-01-19 Thread Chris Teoh
Depends on the use case, if you have to join, you're saving a join and a
shuffle from having it already in an array.

If you explode, at least sort within partitions to get you predicate
pushdown when you read the data next time.

On Sun, 19 Jan 2020, 1:19 pm Jörn Franke,  wrote:

> Why not two tables and then you can join them? This would be the standard
> way. it depends what your full use case is, what volumes / orders you
> expect on average, how aggregations and filters look like. The example
> below states that you do a Select all on the table.
>
> > Am 19.01.2020 um 01:50 schrieb V0lleyBallJunki3 :
> >
> > I am using a dataframe and has structure like this :
> >
> > root
> > |-- orders: array (nullable = true)
> > ||-- element: struct (containsNull = true)
> > |||-- amount: double (nullable = true)
> > |||-- id: string (nullable = true)
> > |-- user: string (nullable = true)
> > |-- language: string (nullable = true)
> >
> > Each user has multiple orders. Now if I explode orders like this:
> >
> > df.select($"user", explode($"orders").as("order")) . Each order element
> will
> > become a row with a duplicated user and language. Was wondering if spark
> > actually converts each order element into a single row in memory or it
> just
> > logical. Because if a single user has 1000 orders  then wouldn't it lead
> to
> > a lot more memory consumption since it is duplicating user and language a
> > 1000 times (once for each order) in memory?
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>