Hi,
This adds a column with value "1" (string) *in all rows*:
|df = df.withColumn("uniqueID", lit("1")) |
||This counts the rows for all rows that have the same |uniqueID|,
*which are all rows*. The window does not make much sense.
And it orders all rows that have the same |uniqueID |by |uniqueID|. Does
not make much sense either.
|df = df.withColumn("row_num", row_number().over(
Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID")) )) |
Then it looks like it takes the first 4000 rows (row_num from 1 to 4000)
and tries to send them via HTTP POST. Then it moves the range by one and
sends row 2 to 4001 (mostly overlapped with the first POST).
It is not clear if the "Data" field is meant to be all rows or only a
single row. Either way, this is not what happens. Please consider the
difference between a Column and a DataFrame in Spark. This is very
different from Pandas.
I think you have to rethink your approach. Using Spark means big data.
This approach is iterative and single-threaded.
Enrico
Am 10.06.22 um 16:01 schrieb Sid:
Hi Enrico,
Thanks for your time. Much appreciated.
I am expecting the payload to be as a JSON string to be a record like
below:
{"A":"some_value","B":"some_value"}
Where A and B are the columns in my dataset.
On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack <i...@enrico.minack.dev>
wrote:
Sid,
just recognized you are using Python API here. Then
||struct(*colsListToBePassed))|| should be correct, given it takes
a list of strings.
Your method |call_to_cust_bulk_api| takes argument |payload|,
which is a ||Column||. This is then used in |custRequestBody|.
That is pretty strange use of a column expression. What do you
expect |print(payload)| to be?
I recommend to split that complex command into multiple commands
to find out what "an error of column not iterable" refers to.
Enrico
Am 10.06.22 um 13:39 schrieb Enrico Minack:
Hi Sid,
||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
.withColumn("status_for_batch", call_to_cust_bulk_api(policyUrl,
to_json(struct(*colsListToBePassed)))) | |
You are calling ||withColumn|| with the result of
||call_to_cust_bulk_api|| as the second argument. That result
looks like it is of type string. But ||withColumn|| expects type
||Column||. You can turn that string into a ||Column|| using ||lit||:
||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
.withColumn("status_for_batch",
lit(call_to_cust_bulk_api(policyUrl,
to_json(struct(*colsListToBePassed))))) ||
You are saying that gives you an error of column not iterable. I
reckon the ||struct(*colsListToBePassed))|| is wrong.
Method ||struct|| requires a single string followed by a list of
strings. Given your ||colsListToBePassed|| is a list of strings,
this does not work. Try:
|| struct(||||||colsListToBePassed.head,
||colsListToBePassed.tail|||||: _*|))||
Alternatively, ||struct|| requires a list of ||Column||, so try this:
|| struct(||||||colsListToBePassed.map(col)|||||||: _*|))||
The API is pretty clear about the types it expects.
If you are still having errors, you better please paste the code
and error.
Enrico
Am 09.06.22 um 21:31 schrieb Sid:
Hi Experts,
I am facing one problem while passing a column to the method.
The problem is described in detail here:
https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
TIA,
Sid