Hi,

This adds a column with value "1" (string) *in all rows*:

|df = df.withColumn("uniqueID", lit("1")) |

||This counts the rows for all rows that have the same |uniqueID|, *which are all rows*. The window does not make much sense. And it orders all rows that have the same |uniqueID |by |uniqueID|. Does not make much sense either. |df = df.withColumn("row_num", row_number().over( Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID")) )) |

Then it looks like it takes the first 4000 rows (row_num from 1 to 4000) and tries to send them via HTTP POST. Then it moves the range by one and sends row 2 to 4001 (mostly overlapped with the first POST).

It is not clear if the "Data" field is meant to be all rows or only a single row. Either way, this is not what happens. Please consider the difference between a Column and a DataFrame in Spark. This is very different from Pandas.

I think you have to rethink your approach. Using Spark means big data. This approach is iterative and single-threaded.

Enrico


Am 10.06.22 um 16:01 schrieb Sid:
Hi Enrico,

Thanks for your time. Much appreciated.

I am expecting the payload to be as a JSON string to be a record like below:

{"A":"some_value","B":"some_value"}

Where A and B are the columns in my dataset.


On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack <i...@enrico.minack.dev> wrote:

    Sid,

    just recognized you are using Python API here. Then
    ||struct(*colsListToBePassed))|| should be correct, given it takes
    a list of strings.

    Your method |call_to_cust_bulk_api| takes argument |payload|,
    which is a ||Column||. This is then used in |custRequestBody|.
    That is pretty strange use of a column expression. What do you
    expect |print(payload)| to be?

    I recommend to split that complex command into multiple commands
    to find out what "an error of column not iterable" refers to.

    Enrico


    Am 10.06.22 um 13:39 schrieb Enrico Minack:
    Hi Sid,

    ||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
    .withColumn("status_for_batch", call_to_cust_bulk_api(policyUrl,
    to_json(struct(*colsListToBePassed)))) | |
    You are calling ||withColumn|| with the result of
    ||call_to_cust_bulk_api|| as the second argument. That result
    looks like it is of type string. But ||withColumn|| expects type
    ||Column||. You can turn that string into a ||Column|| using ||lit||:

    ||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
    .withColumn("status_for_batch",
    lit(call_to_cust_bulk_api(policyUrl,
    to_json(struct(*colsListToBePassed))))) ||

    You are saying that gives you an error of column not iterable. I
    reckon the ||struct(*colsListToBePassed))|| is wrong.

    Method ||struct|| requires a single string followed by a list of
    strings. Given your ||colsListToBePassed|| is a list of strings,
    this does not work. Try:

    ||  struct(||||||colsListToBePassed.head,
    ||colsListToBePassed.tail|||||: _*|))||

    Alternatively, ||struct|| requires a list of ||Column||, so try this:

    ||  struct(||||||colsListToBePassed.map(col)|||||||: _*|))||

    The API is pretty clear about the types it expects.


    If you are still having errors, you better please paste the code
    and error.

    Enrico



    Am 09.06.22 um 21:31 schrieb Sid:
    Hi Experts,

    I am facing one problem while passing a column to the method. 
    The problem is described in detail here:

    
https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark

    TIA,
    Sid



Reply via email to