Hello Ayan, Thank you for the suggestion. But, I would lose correlation of the JSON file with the other identifier fields. Also, if there are too many files, will it be an issue? Plus, I may not have the same schema across all the files.
Hello Enrico, >how does RDD's mapPartitions make a difference regarding I guess, in the question above I do have to process row-wise and RDD may be more efficient? Thanks, Muthu On Tue, 12 Jul 2022 at 14:55, ayan guha <guha.a...@gmail.com> wrote: > Another option is: > > 1. collect the dataframe with file path > 2. create a list of paths > 3. create a new dataframe with spark.read.json and pass the list of path > > This will save you lots of headache > > Ayan > > > On Wed, Jul 13, 2022 at 7:35 AM Enrico Minack <i...@enrico.minack.dev> > wrote: > >> Hi, >> >> how does RDD's mapPartitions make a difference regarding 1. and 2. >> compared to Dataset's mapPartitions / map function? >> >> Enrico >> >> >> Am 12.07.22 um 22:13 schrieb Muthu Jayakumar: >> >> Hello Enrico, >> >> Thanks for the reply. I found that I would have to use `mapPartitions` >> API of RDD to perform this safely as I have to >> 1. Read each file from GCS using HDFS FileSystem API. >> 2. Parse each JSON record in a safe manner. >> >> For (1) to work, I do have to broadcast HadoopConfiguration from >> sparkContext. I did try to use GCS Java API to read content, but ran into >> many JAR conflicts as the HDFS wrapper and the JAR library uses different >> dependencies. >> Hope this findings helps others as well. >> >> Thanks, >> Muthu >> >> >> On Mon, 11 Jul 2022 at 14:11, Enrico Minack <i...@enrico.minack.dev> >> wrote: >> >>> All you need to do is implement a method readJson that reads a single >>> file given its path. Than, you map the values of column file_path to >>> the respective JSON content as a string. This can be done via an UDF or >>> simply Dataset.map: >>> >>> case class RowWithJsonUri(entity_id: String, file_path: String, >>> other_useful_id: String) >>> case class RowWithJsonContent(entity_id: String, json_content: String, >>> other_useful_id: String) >>> >>> val ds = Seq( >>> RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg", >>> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json", >>> "id-2-01g4he5cb4xqn6s1999k6y1vbd"), >>> RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf", >>> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json", >>> "id-2-01g4he5cbh52che104rwy603sr"), >>> RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5", >>> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json", >>> "id-2-01g4he5cbqmdv7dnx46sebs0gt"), >>> RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc", >>> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json", >>> "id-2-01g4he5cbx1kwhgvdme1s560dw") >>> ).toDS() >>> >>> ds.show(false) >>> >>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >>> |entity_id >>> |file_path >>> |other_useful_id | >>> >>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >>> >>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| >>> >>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >>> >>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >>> >>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >>> >>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >>> >>> >>> def readJson(uri: String): String = { s"content of $uri" } >>> >>> ds.map { row => RowWithJsonContent(row.entity_id, >>> readJson(row.file_path), row.other_useful_id) }.show(false) >>> >>> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+ >>> |entity_id >>> |json_content >>> |other_useful_id | >>> >>> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+ >>> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of >>> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| >>> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of >>> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >>> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of >>> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >>> |id-01f7pqqbynh895ptpjjfxvk6dc|content of >>> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >>> >>> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+ >>> >>> Cheers, >>> Enrico >>> >>> >>> >>> >>> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar: >>> >>> Hello there, >>> >>> I have a dataframe with the following... >>> >>> >>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >>> |entity_id |file_path >>> |other_useful_id | >>> >>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >>> >>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| >>> >>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >>> >>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >>> >>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >>> >>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >>> >>> I would like to read each row from `file_path` and write the result to >>> another dataframe containing `entity_id`, `other_useful_id`, >>> `json_content`, `file_path`. >>> Assume that I already have the required HDFS url libraries in my >>> classpath. >>> >>> Please advice, >>> Muthu >>> >>> >>> >>> >> > > -- > Best Regards, > Ayan Guha >