Hi Mich,

Thank you. Ah, I want to avoid bringing all data to the driver node. That
is my understanding of what will happen in that case. Perhaps, I'll trigger
a Lambda to rename/combine the files after PySpark writes them.

Cheers,
Marco.

On Thu, May 4, 2023 at 5:25 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> you can try
>
> df2.coalesce(1).write.mode("overwrite").json("/tmp/pairs.json")
>
> hdfs dfs -ls /tmp/pairs.json
> Found 2 items
> -rw-r--r--   3 hduser supergroup          0 2023-05-04 22:21
> /tmp/pairs.json/_SUCCESS
> -rw-r--r--   3 hduser supergroup         96 2023-05-04 22:21
> /tmp/pairs.json/part-00000-21f12540-c1c6-441d-a9b2-a82ce2113853-c000.json
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 May 2023 at 22:14, Marco Costantini <
> marco.costant...@rocketfncl.com> wrote:
>
>> Hi Mich,
>> Thank you.
>> Are you saying this satisfies my requirement?
>>
>> On the other hand, I am smelling something going on. Perhaps the Spark
>> 'part' files should not be thought of as files, but rather pieces of a
>> conceptual file. If that is true, then your approach (of which I'm well
>> aware) makes sense. Question: what are some good methods, tools, for
>> combining the parts into a single, well-named file? I imagine that is
>> outside of the scope of PySpark, but any advice is welcome.
>>
>> Thank you,
>> Marco.
>>
>> On Thu, May 4, 2023 at 5:05 PM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> AWS S3, or Google gs are hadoop compatible file systems (HCFS) , so they
>>> do sharding to improve read performance when writing to HCFS file systems.
>>>
>>> Let us take your code for a drive
>>>
>>> import findspark
>>> findspark.init()
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import struct
>>> from pyspark.sql.types import *
>>> spark = SparkSession.builder \
>>>         .getOrCreate()
>>> pairs = [(1, "a1"), (2, "a2"), (3, "a3")]
>>> Schema = StructType([ StructField("ID", IntegerType(), False),
>>>                       StructField("datA" , StringType(), True)])
>>> df = spark.createDataFrame(data=pairs,schema=Schema)
>>> df.printSchema()
>>> df.show()
>>> df2 = df.select(df.ID.alias("ID"), struct(df.datA).alias("Struct"))
>>> df2.printSchema()
>>> df2.show()
>>> df2.write.mode("overwrite").json("/tmp/pairs.json")
>>>
>>> root
>>>  |-- ID: integer (nullable = false)
>>>  |-- datA: string (nullable = true)
>>>
>>> +---+----+
>>> | ID|datA|
>>> +---+----+
>>> |  1|  a1|
>>> |  2|  a2|
>>> |  3|  a3|
>>> +---+----+
>>>
>>> root
>>>  |-- ID: integer (nullable = false)
>>>  |-- Struct: struct (nullable = false)
>>>  |    |-- datA: string (nullable = true)
>>>
>>> +---+------+
>>> | ID|Struct|
>>> +---+------+
>>> |  1|  {a1}|
>>> |  2|  {a2}|
>>> |  3|  {a3}|
>>> +---+------+
>>>
>>> Look at the last line where json format is written
>>> df2.write.mode("overwrite").json("/tmp/pairs.json")
>>> Under the bonnet this happens
>>>
>>> hdfs dfs -ls /tmp/pairs.json
>>> Found 5 items
>>> -rw-r--r--   3 hduser supergroup          0 2023-05-04 21:53
>>> /tmp/pairs.json/_SUCCESS
>>> -rw-r--r--   3 hduser supergroup          0 2023-05-04 21:53
>>> /tmp/pairs.json/part-00000-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>>> -rw-r--r--   3 hduser supergroup         32 2023-05-04 21:53
>>> /tmp/pairs.json/part-00001-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>>> -rw-r--r--   3 hduser supergroup         32 2023-05-04 21:53
>>> /tmp/pairs.json/part-00002-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>>> -rw-r--r--   3 hduser supergroup         32 2023-05-04 21:53
>>> /tmp/pairs.json/part-00003-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 4 May 2023 at 21:38, Marco Costantini <
>>> marco.costant...@rocketfncl.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am testing writing my DataFrame to S3 using the DataFrame `write`
>>>> method. It mostly does a great job. However, it fails one of my
>>>> requirements. Here are my requirements.
>>>>
>>>> - Write to S3
>>>> - use `partitionBy` to automatically make folders based on my chosen
>>>> partition columns
>>>> - control the resultant filename (whole or in part)
>>>>
>>>> I can get the first two requirements met but not the third.
>>>>
>>>> Here's an example. When I use the commands...
>>>>
>>>> df.write.partitionBy("year","month").mode("append")\
>>>>     .json('s3a://bucket_name/test_folder/')
>>>>
>>>> ... I get the partitions I need. However, the filenames are something
>>>> like:part-00000-0e2e2096-6d32-458d-bcdf-dbf7d74d80fd.c000.json
>>>>
>>>>
>>>> Now, I understand Spark's need to include the partition number in the
>>>> filename. However, it sure would be nice to control the rest of the file
>>>> name.
>>>>
>>>>
>>>> Any advice? Please and thank you.
>>>>
>>>> Marco.
>>>>
>>>

Reply via email to