Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Sid
What do you mean by overkill here?

I tried the below way to iterate over 4k records under a while loop.
However, it runs for the only first record. What could be wrong here? I am
going through few SO blogs where user found the below approach faster than
withColumn approach :

finalDF = finalDF.select("meta").rdd.map(
lambda x: call_to_cust_bulk_api(policyUrl, x[0])).toDF()


On Mon, Jun 13, 2022 at 4:13 PM Gourav Sengupta 
wrote:

> Hi,
>
> >> spark.range(1).createOrReplaceTempView("test")
> >> maximum_records_per_api_call = 40
> >> batch_count = spark.sql("SELECT * FROM test").count() /
> maximum_records_per_api_call
> >> spark.sql("SELECT id, mod(monotonically_increasing_id() / batch_count)
> batch_id FROM
> test).repartitionByRange("batch_id").createOrReplaceTempView("test_batch")
>
>
> the above code should be able to then be run with a udf as long as we are
> able to control the parallelism with the help of executor count and task
> cpi configuration.
>
> But once again, this is just an unnecessary overkill.
>
>
> Regards,
> Gourav Sengupta
>
> On Mon, Jun 13, 2022 at 10:41 AM Sid  wrote:
>
>> Hi Gourav,
>>
>> Could you please provide me with some examples?
>>
>> On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> try to use mod of a monotonically increasing field and then use
>>> repartitionbyrange function, and see whether SPARK automatically serialises
>>> it based on the number of executors that you put in the job.
>>>
>>> But once again, this is kind of an overkill, for fetching data from a
>>> API, creating a simple python program works quite well.
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Mon, Jun 13, 2022 at 9:28 AM Sid  wrote:
>>>
 Hi Gourav,

 Do you have any examples or links, please? That would help me to
 understand.

 Thanks,
 Sid

 On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <
 gourav.sengu...@gmail.com> wrote:

> Hi,
> I think that serialising data using spark is an overkill, why not use
> normal python.
>
> Also have you tried repartition by range, that way you can use modulus
> operator to batch things up?
>
> Regards,
> Gourav
>
>
> On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:
>
>> Hi Team,
>>
>> I am trying to hit the POST APIs for the very first time using
>> Pyspark.
>>
>> My end goal is to achieve is something like the below:
>>
>>
>>1.  Generate the data
>>2. Send the data in the batch of 4k records in one batch since
>>the API can accept the 4k records at once.
>>3. The record would be as the below:
>>4.
>>
>>{
>>"Token": "",
>>"CustomerName": "",
>>"Object": "",
>>"Data": [{"A":"1"},{"A":"2"}]
>>}
>>
>>5. Token will be generated first then it would be passed to the
>>'Token' key in the data.
>>
>> For the above goal, I initially wrote something like the below which
>> gives a heap error because the data frame is getting created on the 
>> driver
>> side, and the size of the records is a minimum of 1M.
>>df = modifiedData # Assume it to be query results stored
>> as a DF
>>
>> df = df.withColumn("uniqueID", lit("1"))
>>
>> df = df.withColumn("row_num", row_number().over(
>>
>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>> ))
>> tokenUrl = ""
>> # tokenUrl = ""
>> policyUrl = ""
>> tokenBody = {"Username": "", "Password": "",
>> "CustomerName": ""}
>>
>> def get_token(url, payload):
>> try:
>> print("Getting Token")
>> response = requests.request("POST", url,
>> data=payload)
>> data = response.json()
>> if data['ErrorDescription'] == 'Success':
>> token = data['Token']
>> print(":::Token Generated")
>> else:
>> print('TokenNotGeneratedFrom: ')
>> # raise TokenNotGeneratedFrom(500, 'Token not
>> Generated from ')
>> return token
>> except Exception as e:
>> print('TokenNotGeneratedFrom: ' + str(e))
>> # raise TokenNotGeneratedFrom(500, str(e))
>>
>> def call_to_cust_bulk_api(url, payload):
>> print("Calling Bulk API")
>> try:
>> # TODO: write code...
>> headers = {'content-type': 'application/json'}
>> print(":::jsn load")
>> # 

Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Gourav Sengupta
Hi,

>> spark.range(1).createOrReplaceTempView("test")
>> maximum_records_per_api_call = 40
>> batch_count = spark.sql("SELECT * FROM test").count() /
maximum_records_per_api_call
>> spark.sql("SELECT id, mod(monotonically_increasing_id() / batch_count)
batch_id FROM
test).repartitionByRange("batch_id").createOrReplaceTempView("test_batch")


the above code should be able to then be run with a udf as long as we are
able to control the parallelism with the help of executor count and task
cpi configuration.

But once again, this is just an unnecessary overkill.


Regards,
Gourav Sengupta

On Mon, Jun 13, 2022 at 10:41 AM Sid  wrote:

> Hi Gourav,
>
> Could you please provide me with some examples?
>
> On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> try to use mod of a monotonically increasing field and then use
>> repartitionbyrange function, and see whether SPARK automatically serialises
>> it based on the number of executors that you put in the job.
>>
>> But once again, this is kind of an overkill, for fetching data from a
>> API, creating a simple python program works quite well.
>>
>>
>> Regards,
>> Gourav
>>
>> On Mon, Jun 13, 2022 at 9:28 AM Sid  wrote:
>>
>>> Hi Gourav,
>>>
>>> Do you have any examples or links, please? That would help me to
>>> understand.
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
 Hi,
 I think that serialising data using spark is an overkill, why not use
 normal python.

 Also have you tried repartition by range, that way you can use modulus
 operator to batch things up?

 Regards,
 Gourav


 On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:

> Hi Team,
>
> I am trying to hit the POST APIs for the very first time using Pyspark.
>
> My end goal is to achieve is something like the below:
>
>
>1.  Generate the data
>2. Send the data in the batch of 4k records in one batch since the
>API can accept the 4k records at once.
>3. The record would be as the below:
>4.
>
>{
>"Token": "",
>"CustomerName": "",
>"Object": "",
>"Data": [{"A":"1"},{"A":"2"}]
>}
>
>5. Token will be generated first then it would be passed to the
>'Token' key in the data.
>
> For the above goal, I initially wrote something like the below which
> gives a heap error because the data frame is getting created on the driver
> side, and the size of the records is a minimum of 1M.
>df = modifiedData # Assume it to be query results stored as
> a DF
>
> df = df.withColumn("uniqueID", lit("1"))
>
> df = df.withColumn("row_num", row_number().over(
>
> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
> ))
> tokenUrl = ""
> # tokenUrl = ""
> policyUrl = ""
> tokenBody = {"Username": "", "Password": "",
> "CustomerName": ""}
>
> def get_token(url, payload):
> try:
> print("Getting Token")
> response = requests.request("POST", url,
> data=payload)
> data = response.json()
> if data['ErrorDescription'] == 'Success':
> token = data['Token']
> print(":::Token Generated")
> else:
> print('TokenNotGeneratedFrom: ')
> # raise TokenNotGeneratedFrom(500, 'Token not
> Generated from ')
> return token
> except Exception as e:
> print('TokenNotGeneratedFrom: ' + str(e))
> # raise TokenNotGeneratedFrom(500, str(e))
>
> def call_to_cust_bulk_api(url, payload):
> print("Calling Bulk API")
> try:
> # TODO: write code...
> headers = {'content-type': 'application/json'}
> print(":::jsn load")
> # print(json.dumps(payload))
> # print(payload)
> response = requests.post(url,
> data=json.dumps(payload), headers=headers)
> # print(json.dumps(payload))
> data = response.json()
> return data
> except Exception as e:
> print('ExceptionInPushingDataTo: ' + str(e))
> # raise ExceptionInPushingDataTo(500, str(e))
>
> total_count = df.count()
> i = 1
> while i < total_count:
> rangeNum 

Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Sid
Hi Gourav,

Could you please provide me with some examples?

On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta 
wrote:

> Hi,
>
> try to use mod of a monotonically increasing field and then use
> repartitionbyrange function, and see whether SPARK automatically serialises
> it based on the number of executors that you put in the job.
>
> But once again, this is kind of an overkill, for fetching data from a API,
> creating a simple python program works quite well.
>
>
> Regards,
> Gourav
>
> On Mon, Jun 13, 2022 at 9:28 AM Sid  wrote:
>
>> Hi Gourav,
>>
>> Do you have any examples or links, please? That would help me to
>> understand.
>>
>> Thanks,
>> Sid
>>
>> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>> I think that serialising data using spark is an overkill, why not use
>>> normal python.
>>>
>>> Also have you tried repartition by range, that way you can use modulus
>>> operator to batch things up?
>>>
>>> Regards,
>>> Gourav
>>>
>>>
>>> On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:
>>>
 Hi Team,

 I am trying to hit the POST APIs for the very first time using Pyspark.

 My end goal is to achieve is something like the below:


1.  Generate the data
2. Send the data in the batch of 4k records in one batch since the
API can accept the 4k records at once.
3. The record would be as the below:
4.

{
"Token": "",
"CustomerName": "",
"Object": "",
"Data": [{"A":"1"},{"A":"2"}]
}

5. Token will be generated first then it would be passed to the
'Token' key in the data.

 For the above goal, I initially wrote something like the below which
 gives a heap error because the data frame is getting created on the driver
 side, and the size of the records is a minimum of 1M.
df = modifiedData # Assume it to be query results stored as
 a DF

 df = df.withColumn("uniqueID", lit("1"))

 df = df.withColumn("row_num", row_number().over(

 Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
 ))
 tokenUrl = ""
 # tokenUrl = ""
 policyUrl = ""
 tokenBody = {"Username": "", "Password": "",
 "CustomerName": ""}

 def get_token(url, payload):
 try:
 print("Getting Token")
 response = requests.request("POST", url,
 data=payload)
 data = response.json()
 if data['ErrorDescription'] == 'Success':
 token = data['Token']
 print(":::Token Generated")
 else:
 print('TokenNotGeneratedFrom: ')
 # raise TokenNotGeneratedFrom(500, 'Token not
 Generated from ')
 return token
 except Exception as e:
 print('TokenNotGeneratedFrom: ' + str(e))
 # raise TokenNotGeneratedFrom(500, str(e))

 def call_to_cust_bulk_api(url, payload):
 print("Calling Bulk API")
 try:
 # TODO: write code...
 headers = {'content-type': 'application/json'}
 print(":::jsn load")
 # print(json.dumps(payload))
 # print(payload)
 response = requests.post(url,
 data=json.dumps(payload), headers=headers)
 # print(json.dumps(payload))
 data = response.json()
 return data
 except Exception as e:
 print('ExceptionInPushingDataTo: ' + str(e))
 # raise ExceptionInPushingDataTo(500, str(e))

 total_count = df.count()
 i = 1
 while i < total_count:
 rangeNum = i + 3999
 print("Range Num:::")
 print(rangeNum)
 df1 = df.filter((col("row_num") >= i) & (col("row_num")
 <= rangeNum))
 df1.cache()
 maxValue =
 df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
 finalDF = df1.drop("row_num", "edl_timestamp",
 "uniqueID")
 print("finalDF count:::", finalDF.count())
 token = get_token(tokenUrl, tokenBody)

 result =
 json.loads((finalDF.toPandas().to_json(orient="records")))
 # token = get_token(tokenUrl, tokenBody)
 custRequestBody = {
 

Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Gourav Sengupta
Hi,

try to use mod of a monotonically increasing field and then use
repartitionbyrange function, and see whether SPARK automatically serialises
it based on the number of executors that you put in the job.

But once again, this is kind of an overkill, for fetching data from a API,
creating a simple python program works quite well.


Regards,
Gourav

On Mon, Jun 13, 2022 at 9:28 AM Sid  wrote:

> Hi Gourav,
>
> Do you have any examples or links, please? That would help me to
> understand.
>
> Thanks,
> Sid
>
> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>> I think that serialising data using spark is an overkill, why not use
>> normal python.
>>
>> Also have you tried repartition by range, that way you can use modulus
>> operator to batch things up?
>>
>> Regards,
>> Gourav
>>
>>
>> On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> I am trying to hit the POST APIs for the very first time using Pyspark.
>>>
>>> My end goal is to achieve is something like the below:
>>>
>>>
>>>1.  Generate the data
>>>2. Send the data in the batch of 4k records in one batch since the
>>>API can accept the 4k records at once.
>>>3. The record would be as the below:
>>>4.
>>>
>>>{
>>>"Token": "",
>>>"CustomerName": "",
>>>"Object": "",
>>>"Data": [{"A":"1"},{"A":"2"}]
>>>}
>>>
>>>5. Token will be generated first then it would be passed to the
>>>'Token' key in the data.
>>>
>>> For the above goal, I initially wrote something like the below which
>>> gives a heap error because the data frame is getting created on the driver
>>> side, and the size of the records is a minimum of 1M.
>>>df = modifiedData # Assume it to be query results stored as a
>>> DF
>>>
>>> df = df.withColumn("uniqueID", lit("1"))
>>>
>>> df = df.withColumn("row_num", row_number().over(
>>>
>>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>>> ))
>>> tokenUrl = ""
>>> # tokenUrl = ""
>>> policyUrl = ""
>>> tokenBody = {"Username": "", "Password": "", "CustomerName":
>>> ""}
>>>
>>> def get_token(url, payload):
>>> try:
>>> print("Getting Token")
>>> response = requests.request("POST", url,
>>> data=payload)
>>> data = response.json()
>>> if data['ErrorDescription'] == 'Success':
>>> token = data['Token']
>>> print(":::Token Generated")
>>> else:
>>> print('TokenNotGeneratedFrom: ')
>>> # raise TokenNotGeneratedFrom(500, 'Token not
>>> Generated from ')
>>> return token
>>> except Exception as e:
>>> print('TokenNotGeneratedFrom: ' + str(e))
>>> # raise TokenNotGeneratedFrom(500, str(e))
>>>
>>> def call_to_cust_bulk_api(url, payload):
>>> print("Calling Bulk API")
>>> try:
>>> # TODO: write code...
>>> headers = {'content-type': 'application/json'}
>>> print(":::jsn load")
>>> # print(json.dumps(payload))
>>> # print(payload)
>>> response = requests.post(url,
>>> data=json.dumps(payload), headers=headers)
>>> # print(json.dumps(payload))
>>> data = response.json()
>>> return data
>>> except Exception as e:
>>> print('ExceptionInPushingDataTo: ' + str(e))
>>> # raise ExceptionInPushingDataTo(500, str(e))
>>>
>>> total_count = df.count()
>>> i = 1
>>> while i < total_count:
>>> rangeNum = i + 3999
>>> print("Range Num:::")
>>> print(rangeNum)
>>> df1 = df.filter((col("row_num") >= i) & (col("row_num")
>>> <= rangeNum))
>>> df1.cache()
>>> maxValue =
>>> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
>>> finalDF = df1.drop("row_num", "edl_timestamp",
>>> "uniqueID")
>>> print("finalDF count:::", finalDF.count())
>>> token = get_token(tokenUrl, tokenBody)
>>>
>>> result =
>>> json.loads((finalDF.toPandas().to_json(orient="records")))
>>> # token = get_token(tokenUrl, tokenBody)
>>> custRequestBody = {
>>> "Token": token,
>>> "CustomerName": "",
>>> "Object": "",
>>> "Data": result
>>> }
>>>
>>> # print("Customer Request Body::")
>>> # print(json.dumps(custRequestBody))
>>>   

Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Sid
Hi Gourav,

Do you have any examples or links, please? That would help me to understand.

Thanks,
Sid

On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta 
wrote:

> Hi,
> I think that serialising data using spark is an overkill, why not use
> normal python.
>
> Also have you tried repartition by range, that way you can use modulus
> operator to batch things up?
>
> Regards,
> Gourav
>
>
> On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:
>
>> Hi Team,
>>
>> I am trying to hit the POST APIs for the very first time using Pyspark.
>>
>> My end goal is to achieve is something like the below:
>>
>>
>>1.  Generate the data
>>2. Send the data in the batch of 4k records in one batch since the
>>API can accept the 4k records at once.
>>3. The record would be as the below:
>>4.
>>
>>{
>>"Token": "",
>>"CustomerName": "",
>>"Object": "",
>>"Data": [{"A":"1"},{"A":"2"}]
>>}
>>
>>5. Token will be generated first then it would be passed to the
>>'Token' key in the data.
>>
>> For the above goal, I initially wrote something like the below which
>> gives a heap error because the data frame is getting created on the driver
>> side, and the size of the records is a minimum of 1M.
>>df = modifiedData # Assume it to be query results stored as a
>> DF
>>
>> df = df.withColumn("uniqueID", lit("1"))
>>
>> df = df.withColumn("row_num", row_number().over(
>>
>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>> ))
>> tokenUrl = ""
>> # tokenUrl = ""
>> policyUrl = ""
>> tokenBody = {"Username": "", "Password": "", "CustomerName":
>> ""}
>>
>> def get_token(url, payload):
>> try:
>> print("Getting Token")
>> response = requests.request("POST", url, data=payload)
>> data = response.json()
>> if data['ErrorDescription'] == 'Success':
>> token = data['Token']
>> print(":::Token Generated")
>> else:
>> print('TokenNotGeneratedFrom: ')
>> # raise TokenNotGeneratedFrom(500, 'Token not
>> Generated from ')
>> return token
>> except Exception as e:
>> print('TokenNotGeneratedFrom: ' + str(e))
>> # raise TokenNotGeneratedFrom(500, str(e))
>>
>> def call_to_cust_bulk_api(url, payload):
>> print("Calling Bulk API")
>> try:
>> # TODO: write code...
>> headers = {'content-type': 'application/json'}
>> print(":::jsn load")
>> # print(json.dumps(payload))
>> # print(payload)
>> response = requests.post(url,
>> data=json.dumps(payload), headers=headers)
>> # print(json.dumps(payload))
>> data = response.json()
>> return data
>> except Exception as e:
>> print('ExceptionInPushingDataTo: ' + str(e))
>> # raise ExceptionInPushingDataTo(500, str(e))
>>
>> total_count = df.count()
>> i = 1
>> while i < total_count:
>> rangeNum = i + 3999
>> print("Range Num:::")
>> print(rangeNum)
>> df1 = df.filter((col("row_num") >= i) & (col("row_num")
>> <= rangeNum))
>> df1.cache()
>> maxValue =
>> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
>> finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
>> print("finalDF count:::", finalDF.count())
>> token = get_token(tokenUrl, tokenBody)
>>
>> result =
>> json.loads((finalDF.toPandas().to_json(orient="records")))
>> # token = get_token(tokenUrl, tokenBody)
>> custRequestBody = {
>> "Token": token,
>> "CustomerName": "",
>> "Object": "",
>> "Data": result
>> }
>>
>> # print("Customer Request Body::")
>> # print(json.dumps(custRequestBody))
>> response = call_to_cust_bulk_api(policyUrl,
>> custRequestBody)
>> print(response)
>> finalDFStatus = finalDF.withColumn("edl_timestamp",
>> to_timestamp(lit(F.TimeNow(.withColumn(
>> "status_for_each_batch",
>> lit(str(response)))
>>
>>
>> print("Max Value:::")
>> print(maxValue)
>> print("Next I:::")
>> i = rangeNum + 1
>> print(i)
>>
>> This is my very first approach 

Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Gourav Sengupta
Hi,
I think that serialising data using spark is an overkill, why not use
normal python.

Also have you tried repartition by range, that way you can use modulus
operator to batch things up?

Regards,
Gourav


On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:

> Hi Team,
>
> I am trying to hit the POST APIs for the very first time using Pyspark.
>
> My end goal is to achieve is something like the below:
>
>
>1.  Generate the data
>2. Send the data in the batch of 4k records in one batch since the API
>can accept the 4k records at once.
>3. The record would be as the below:
>4.
>
>{
>"Token": "",
>"CustomerName": "",
>"Object": "",
>"Data": [{"A":"1"},{"A":"2"}]
>}
>
>5. Token will be generated first then it would be passed to the
>'Token' key in the data.
>
> For the above goal, I initially wrote something like the below which gives
> a heap error because the data frame is getting created on the driver side,
> and the size of the records is a minimum of 1M.
>df = modifiedData # Assume it to be query results stored as a DF
>
> df = df.withColumn("uniqueID", lit("1"))
>
> df = df.withColumn("row_num", row_number().over(
>
> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
> ))
> tokenUrl = ""
> # tokenUrl = ""
> policyUrl = ""
> tokenBody = {"Username": "", "Password": "", "CustomerName":
> ""}
>
> def get_token(url, payload):
> try:
> print("Getting Token")
> response = requests.request("POST", url, data=payload)
> data = response.json()
> if data['ErrorDescription'] == 'Success':
> token = data['Token']
> print(":::Token Generated")
> else:
> print('TokenNotGeneratedFrom: ')
> # raise TokenNotGeneratedFrom(500, 'Token not
> Generated from ')
> return token
> except Exception as e:
> print('TokenNotGeneratedFrom: ' + str(e))
> # raise TokenNotGeneratedFrom(500, str(e))
>
> def call_to_cust_bulk_api(url, payload):
> print("Calling Bulk API")
> try:
> # TODO: write code...
> headers = {'content-type': 'application/json'}
> print(":::jsn load")
> # print(json.dumps(payload))
> # print(payload)
> response = requests.post(url,
> data=json.dumps(payload), headers=headers)
> # print(json.dumps(payload))
> data = response.json()
> return data
> except Exception as e:
> print('ExceptionInPushingDataTo: ' + str(e))
> # raise ExceptionInPushingDataTo(500, str(e))
>
> total_count = df.count()
> i = 1
> while i < total_count:
> rangeNum = i + 3999
> print("Range Num:::")
> print(rangeNum)
> df1 = df.filter((col("row_num") >= i) & (col("row_num") <=
> rangeNum))
> df1.cache()
> maxValue =
> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
> finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
> print("finalDF count:::", finalDF.count())
> token = get_token(tokenUrl, tokenBody)
>
> result =
> json.loads((finalDF.toPandas().to_json(orient="records")))
> # token = get_token(tokenUrl, tokenBody)
> custRequestBody = {
> "Token": token,
> "CustomerName": "",
> "Object": "",
> "Data": result
> }
>
> # print("Customer Request Body::")
> # print(json.dumps(custRequestBody))
> response = call_to_cust_bulk_api(policyUrl,
> custRequestBody)
> print(response)
> finalDFStatus = finalDF.withColumn("edl_timestamp",
> to_timestamp(lit(F.TimeNow(.withColumn(
> "status_for_each_batch",
> lit(str(response)))
>
>
> print("Max Value:::")
> print(maxValue)
> print("Next I:::")
> i = rangeNum + 1
> print(i)
>
> This is my very first approach to hitting the APIs with Spark. So, could
> anyone please help me to redesign the approach, or can share some links or
> references using which I can go to the depth of this and rectify myself.
> How can I scale this?
>
>
> Any help is much appreciated.
>
> TIA,
> Sid
>


Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Sid
Hi Team,

I am trying to hit the POST APIs for the very first time using Pyspark.

My end goal is to achieve is something like the below:


   1.  Generate the data
   2. Send the data in the batch of 4k records in one batch since the API
   can accept the 4k records at once.
   3. The record would be as the below:
   4.

   {
   "Token": "",
   "CustomerName": "",
   "Object": "",
   "Data": [{"A":"1"},{"A":"2"}]
   }

   5. Token will be generated first then it would be passed to the 'Token'
   key in the data.

For the above goal, I initially wrote something like the below which gives
a heap error because the data frame is getting created on the driver side,
and the size of the records is a minimum of 1M.
   df = modifiedData # Assume it to be query results stored as a DF

df = df.withColumn("uniqueID", lit("1"))

df = df.withColumn("row_num", row_number().over(
Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
))
tokenUrl = ""
# tokenUrl = ""
policyUrl = ""
tokenBody = {"Username": "", "Password": "", "CustomerName": ""}

def get_token(url, payload):
try:
print("Getting Token")
response = requests.request("POST", url, data=payload)
data = response.json()
if data['ErrorDescription'] == 'Success':
token = data['Token']
print(":::Token Generated")
else:
print('TokenNotGeneratedFrom: ')
# raise TokenNotGeneratedFrom(500, 'Token not
Generated from ')
return token
except Exception as e:
print('TokenNotGeneratedFrom: ' + str(e))
# raise TokenNotGeneratedFrom(500, str(e))

def call_to_cust_bulk_api(url, payload):
print("Calling Bulk API")
try:
# TODO: write code...
headers = {'content-type': 'application/json'}
print(":::jsn load")
# print(json.dumps(payload))
# print(payload)
response = requests.post(url, data=json.dumps(payload),
headers=headers)
# print(json.dumps(payload))
data = response.json()
return data
except Exception as e:
print('ExceptionInPushingDataTo: ' + str(e))
# raise ExceptionInPushingDataTo(500, str(e))

total_count = df.count()
i = 1
while i < total_count:
rangeNum = i + 3999
print("Range Num:::")
print(rangeNum)
df1 = df.filter((col("row_num") >= i) & (col("row_num") <=
rangeNum))
df1.cache()
maxValue =
df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
print("finalDF count:::", finalDF.count())
token = get_token(tokenUrl, tokenBody)

result =
json.loads((finalDF.toPandas().to_json(orient="records")))
# token = get_token(tokenUrl, tokenBody)
custRequestBody = {
"Token": token,
"CustomerName": "",
"Object": "",
"Data": result
}

# print("Customer Request Body::")
# print(json.dumps(custRequestBody))
response = call_to_cust_bulk_api(policyUrl, custRequestBody)
print(response)
finalDFStatus = finalDF.withColumn("edl_timestamp",
to_timestamp(lit(F.TimeNow(.withColumn(
"status_for_each_batch",
lit(str(response)))


print("Max Value:::")
print(maxValue)
print("Next I:::")
i = rangeNum + 1
print(i)

This is my very first approach to hitting the APIs with Spark. So, could
anyone please help me to redesign the approach, or can share some links or
references using which I can go to the depth of this and rectify myself.
How can I scale this?


Any help is much appreciated.

TIA,
Sid