Hi Mich, Thank you. Ah, I want to avoid bringing all data to the driver node. That is my understanding of what will happen in that case. Perhaps, I'll trigger a Lambda to rename/combine the files after PySpark writes them.
Cheers, Marco. On Thu, May 4, 2023 at 5:25 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > you can try > > df2.coalesce(1).write.mode("overwrite").json("/tmp/pairs.json") > > hdfs dfs -ls /tmp/pairs.json > Found 2 items > -rw-r--r-- 3 hduser supergroup 0 2023-05-04 22:21 > /tmp/pairs.json/_SUCCESS > -rw-r--r-- 3 hduser supergroup 96 2023-05-04 22:21 > /tmp/pairs.json/part-00000-21f12540-c1c6-441d-a9b2-a82ce2113853-c000.json > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 4 May 2023 at 22:14, Marco Costantini < > marco.costant...@rocketfncl.com> wrote: > >> Hi Mich, >> Thank you. >> Are you saying this satisfies my requirement? >> >> On the other hand, I am smelling something going on. Perhaps the Spark >> 'part' files should not be thought of as files, but rather pieces of a >> conceptual file. If that is true, then your approach (of which I'm well >> aware) makes sense. Question: what are some good methods, tools, for >> combining the parts into a single, well-named file? I imagine that is >> outside of the scope of PySpark, but any advice is welcome. >> >> Thank you, >> Marco. >> >> On Thu, May 4, 2023 at 5:05 PM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> AWS S3, or Google gs are hadoop compatible file systems (HCFS) , so they >>> do sharding to improve read performance when writing to HCFS file systems. >>> >>> Let us take your code for a drive >>> >>> import findspark >>> findspark.init() >>> from pyspark.sql import SparkSession >>> from pyspark.sql.functions import struct >>> from pyspark.sql.types import * >>> spark = SparkSession.builder \ >>> .getOrCreate() >>> pairs = [(1, "a1"), (2, "a2"), (3, "a3")] >>> Schema = StructType([ StructField("ID", IntegerType(), False), >>> StructField("datA" , StringType(), True)]) >>> df = spark.createDataFrame(data=pairs,schema=Schema) >>> df.printSchema() >>> df.show() >>> df2 = df.select(df.ID.alias("ID"), struct(df.datA).alias("Struct")) >>> df2.printSchema() >>> df2.show() >>> df2.write.mode("overwrite").json("/tmp/pairs.json") >>> >>> root >>> |-- ID: integer (nullable = false) >>> |-- datA: string (nullable = true) >>> >>> +---+----+ >>> | ID|datA| >>> +---+----+ >>> | 1| a1| >>> | 2| a2| >>> | 3| a3| >>> +---+----+ >>> >>> root >>> |-- ID: integer (nullable = false) >>> |-- Struct: struct (nullable = false) >>> | |-- datA: string (nullable = true) >>> >>> +---+------+ >>> | ID|Struct| >>> +---+------+ >>> | 1| {a1}| >>> | 2| {a2}| >>> | 3| {a3}| >>> +---+------+ >>> >>> Look at the last line where json format is written >>> df2.write.mode("overwrite").json("/tmp/pairs.json") >>> Under the bonnet this happens >>> >>> hdfs dfs -ls /tmp/pairs.json >>> Found 5 items >>> -rw-r--r-- 3 hduser supergroup 0 2023-05-04 21:53 >>> /tmp/pairs.json/_SUCCESS >>> -rw-r--r-- 3 hduser supergroup 0 2023-05-04 21:53 >>> /tmp/pairs.json/part-00000-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json >>> -rw-r--r-- 3 hduser supergroup 32 2023-05-04 21:53 >>> /tmp/pairs.json/part-00001-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json >>> -rw-r--r-- 3 hduser supergroup 32 2023-05-04 21:53 >>> /tmp/pairs.json/part-00002-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json >>> -rw-r--r-- 3 hduser supergroup 32 2023-05-04 21:53 >>> /tmp/pairs.json/part-00003-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json >>> >>> HTH >>> >>> Mich Talebzadeh, >>> Lead Solutions Architect/Engineering Lead >>> Palantir Technologies Limited >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Thu, 4 May 2023 at 21:38, Marco Costantini < >>> marco.costant...@rocketfncl.com> wrote: >>> >>>> Hello, >>>> >>>> I am testing writing my DataFrame to S3 using the DataFrame `write` >>>> method. It mostly does a great job. However, it fails one of my >>>> requirements. Here are my requirements. >>>> >>>> - Write to S3 >>>> - use `partitionBy` to automatically make folders based on my chosen >>>> partition columns >>>> - control the resultant filename (whole or in part) >>>> >>>> I can get the first two requirements met but not the third. >>>> >>>> Here's an example. When I use the commands... >>>> >>>> df.write.partitionBy("year","month").mode("append")\ >>>> .json('s3a://bucket_name/test_folder/') >>>> >>>> ... I get the partitions I need. However, the filenames are something >>>> like:part-00000-0e2e2096-6d32-458d-bcdf-dbf7d74d80fd.c000.json >>>> >>>> >>>> Now, I understand Spark's need to include the partition number in the >>>> filename. However, it sure would be nice to control the rest of the file >>>> name. >>>> >>>> >>>> Any advice? Please and thank you. >>>> >>>> Marco. >>>> >>>