Hi,

I think that this is a pure example of over engineering.

Ayan's advice is the best. Please use SPARK SQL function called as
input_file_name() to join the tables. People do not think in terms of RDD
anymore unless absolutely required.

Also if you have different JSON schemas, just use the SPARK system to read
it as a string first or use 100% scanning of the files to have a full
schema.

Regards,
Gourav Sengupta

On Wed, Jul 13, 2022 at 12:41 AM Muthu Jayakumar <bablo...@gmail.com> wrote:

> Hello Ayan,
>
> Thank you for the suggestion. But, I would lose correlation of the JSON
> file with the other identifier fields. Also, if there are too many files,
> will it be an issue? Plus, I may not have the same schema across all the
> files.
>
> Hello Enrico,
>
> >how does RDD's mapPartitions make a difference regarding
> I guess, in the question above I do have to process row-wise and RDD may
> be more efficient?
>
> Thanks,
> Muthu
>
> On Tue, 12 Jul 2022 at 14:55, ayan guha <guha.a...@gmail.com> wrote:
>
>> Another option is:
>>
>> 1. collect the dataframe with file path
>> 2. create a list of paths
>> 3. create a new dataframe with spark.read.json and pass the list of path
>>
>> This will save you lots of headache
>>
>> Ayan
>>
>>
>> On Wed, Jul 13, 2022 at 7:35 AM Enrico Minack <i...@enrico.minack.dev>
>> wrote:
>>
>>> Hi,
>>>
>>> how does RDD's mapPartitions make a difference regarding 1. and 2.
>>> compared to Dataset's mapPartitions / map function?
>>>
>>> Enrico
>>>
>>>
>>> Am 12.07.22 um 22:13 schrieb Muthu Jayakumar:
>>>
>>> Hello Enrico,
>>>
>>> Thanks for the reply. I found that I would have to use `mapPartitions`
>>> API of RDD to perform this safely as I have to
>>> 1. Read each file from GCS using HDFS FileSystem API.
>>> 2. Parse each JSON record in a safe manner.
>>>
>>> For (1) to work, I do have to broadcast HadoopConfiguration from
>>> sparkContext. I did try to use GCS Java API to read content, but ran into
>>> many JAR conflicts as the HDFS wrapper and the JAR library uses different
>>> dependencies.
>>> Hope this findings helps others as well.
>>>
>>> Thanks,
>>> Muthu
>>>
>>>
>>> On Mon, 11 Jul 2022 at 14:11, Enrico Minack <i...@enrico.minack.dev>
>>> wrote:
>>>
>>>> All you need to do is implement a method readJson that reads a single
>>>> file given its path. Than, you map the values of column file_path to
>>>> the respective JSON content as a string. This can be done via an UDF or
>>>> simply Dataset.map:
>>>>
>>>> case class RowWithJsonUri(entity_id: String, file_path: String,
>>>> other_useful_id: String)
>>>> case class RowWithJsonContent(entity_id: String, json_content: String,
>>>> other_useful_id: String)
>>>>
>>>> val ds = Seq(
>>>>   RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
>>>> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
>>>> "id-2-01g4he5cb4xqn6s1999k6y1vbd"),
>>>>   RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
>>>> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
>>>> "id-2-01g4he5cbh52che104rwy603sr"),
>>>>   RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
>>>> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
>>>> "id-2-01g4he5cbqmdv7dnx46sebs0gt"),
>>>>   RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
>>>> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
>>>> "id-2-01g4he5cbx1kwhgvdme1s560dw")
>>>> ).toDS()
>>>>
>>>> ds.show(false)
>>>>
>>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>>> |entity_id
>>>> |file_path
>>>> |other_useful_id                |
>>>>
>>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>>>
>>>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>>>
>>>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>>>
>>>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>>>
>>>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>>>
>>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>>>
>>>>
>>>> def readJson(uri: String): String = { s"content of $uri" }
>>>>
>>>> ds.map { row => RowWithJsonContent(row.entity_id,
>>>> readJson(row.file_path), row.other_useful_id) }.show(false)
>>>>
>>>> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+
>>>> |entity_id
>>>> |json_content
>>>> |other_useful_id                |
>>>>
>>>> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+
>>>> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of
>>>> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>>> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of
>>>> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>>> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of
>>>> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>>> |id-01f7pqqbynh895ptpjjfxvk6dc|content of
>>>> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>>>
>>>> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+
>>>>
>>>> Cheers,
>>>> Enrico
>>>>
>>>>
>>>>
>>>>
>>>> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:
>>>>
>>>> Hello there,
>>>>
>>>> I have a dataframe with the following...
>>>>
>>>>
>>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>>> |entity_id                    |file_path
>>>>                            |other_useful_id                |
>>>>
>>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>>>
>>>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>>>
>>>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>>>
>>>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>>>
>>>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>>>
>>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>>>
>>>> I would like to read each row from `file_path` and write the result to
>>>> another dataframe containing `entity_id`, `other_useful_id`,
>>>> `json_content`, `file_path`.
>>>> Assume that I already have the required HDFS url libraries in my
>>>> classpath.
>>>>
>>>> Please advice,
>>>> Muthu
>>>>
>>>>
>>>>
>>>>
>>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>

Reply via email to