Coming in late.. but if I understand correctly, you can simply use the fact that spark.read (or readStream) will also accept a directory argument. If you provide a directory spark will automagically pull in all the files in that directory.
"""Reading in multiple files example""" spark = SparkSession.builder.master('local[*]').appName('spark_streaming').getOrCreate() # Schema for incoming data json_schema = StructType([StructField("username", StringType(), True), StructField("name", StringType(), True), StructField("sex", StringType(), True), StructField("address", StringType(), True), StructField("mail", StringType(), True), StructField("birthdate", DateType(), True), StructField("work", StringType(), True), StructField("salary", IntegerType(), True), StructField("timestamp", TimestampType(), True)]) # Read in a bunch of data files (files are in JSON per line format)data_directory_path = './data/my_directory' # Create a Spark DF with a bunch of files spark_df = spark.read.schema(json_schema).json(data_directory_path) On Mon, Jan 18, 2021 at 11:22 AM Amit Joshi <mailtojoshia...@gmail.com> wrote: > Hi Boris, > > Thanks for your code block. > I understood what you are trying to achieve in the code. > > But content in the file are json records seperated by new line. > And we have to make the dataframe out of it, as some processing has to be > done on it. > > Regards > Amit > On Monday, January 18, 2021, Boris Litvak <boris.lit...@skf.com> wrote: > >> HI Amit, >> >> >> >> I was thinking along the lines of (python): >> >> >> >> >> @udf(returnType=StringType()) >> def reader_udf(filename: str) -> str: >> with open(filename, "r") as f: >> return f.read() >> >> >> def run_locally(): >> with utils.build_spark_session("Local", local=True) as spark: >> df = spark.readStream.csv(r'testdata', schema >> =StructType([StructField('filename', StringType(), True)])) >> df = df.withColumn('content', reader_udf(col('filename'))) >> q = df.select('content').writeStream.queryName('test').format( >> 'console').start() >> q.awaitTermination() >> >> >> >> Now each row contains the contents of the files, provided they are not >> large you can foreach() over the df/rdd and do whatever you want with it, >> such as json.loads()/etc. >> >> If you know the shema of the jsons, you can later explode() them into a >> flat DF, ala >> https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala >> >> >> >> Note that unless I am missing something you cannot access spark session >> from foreach as code is not running on the driver. >> >> Please say if it makes sense or did I miss anything. >> >> >> >> Boris >> >> >> >> *From:* Amit Joshi <mailtojoshia...@gmail.com> >> *Sent:* Monday, 18 January 2021 17:10 >> *To:* Boris Litvak <boris.lit...@skf.com> >> *Cc:* spark-user <user@spark.apache.org> >> *Subject:* Re: [Spark Structured Streaming] Processing the data path >> coming from kafka. >> >> >> >> Hi Boris, >> >> >> >> I need to do processing on the data present in the path. >> >> That is the reason I am trying to make the dataframe. >> >> >> >> Can you please provide the example of your solution? >> >> >> >> Regards >> >> Amit >> >> >> >> On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak <boris.lit...@skf.com> >> wrote: >> >> Hi Amit, >> >> >> >> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function >> that reads the paths? >> >> Also, do you really have to read the json into an additional dataframe? >> >> >> >> Thanks, Boris >> >> >> >> *From:* Amit Joshi <mailtojoshia...@gmail.com> >> *Sent:* Monday, 18 January 2021 15:04 >> *To:* spark-user <user@spark.apache.org> >> *Subject:* [Spark Structured Streaming] Processing the data path coming >> from kafka. >> >> >> >> Hi , >> >> >> >> I have a use case where the file path of the json records stored in s3 >> are coming as a kafka >> >> message in kafka. I have to process the data using spark structured >> streaming. >> >> >> >> The design which I thought is as follows: >> >> 1. In kafka Spark structures streaming, read the message containing the >> data path. >> >> 2. Collect the message record in driver. (Messages are small in sizes) >> >> 3. Create the dataframe from the datalocation. >> >> >> >> *kafkaDf*.select(*$"value"*.cast(StringType)) >> .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => { >> >> //rough code >> >> //collec to driver >> >> *val *records = batchDf.collect() >> >> //create dataframe and process >> records foreach((rec: Row) =>{ >> *println*(*"records:######################"*,rec.toString()) >> val path = rec.getAs[String](*"data_path"*) >> >> val dfToProcess =spark.read.json(path) >> >> .... >> >> }) >> >> } >> >> I would like to know the views, if this approach is fine? Specifically if >> there is some problem with >> >> with creating the dataframe after calling collect. >> >> If there is any better approach, please let know the same. >> >> >> >> Regards >> >> Amit Joshi >> >>