Coming in late.. but if I understand correctly, you can simply use the fact
that spark.read (or readStream) will also accept a directory argument. If
you provide a directory spark will automagically pull in all the files in
that directory.

"""Reading in multiple files example"""
spark = 
SparkSession.builder.master('local[*]').appName('spark_streaming').getOrCreate()

# Schema for incoming data
json_schema = StructType([StructField("username", StringType(), True),
                          StructField("name", StringType(), True),
                          StructField("sex", StringType(), True),
                          StructField("address", StringType(), True),
                          StructField("mail", StringType(), True),
                          StructField("birthdate", DateType(), True),
                          StructField("work", StringType(), True),
                          StructField("salary", IntegerType(), True),
                          StructField("timestamp", TimestampType(), True)])

# Read in a bunch of data files (files are in JSON per line
format)data_directory_path = './data/my_directory'


# Create a Spark DF with a bunch of files
spark_df = spark.read.schema(json_schema).json(data_directory_path)




On Mon, Jan 18, 2021 at 11:22 AM Amit Joshi <mailtojoshia...@gmail.com>
wrote:

> Hi Boris,
>
> Thanks for your code block.
> I understood what you are trying to achieve in the code.
>
> But content in the file are json records seperated by new line.
> And we have to make the dataframe out of it, as some processing has to be
> done on it.
>
> Regards
> Amit
> On Monday, January 18, 2021, Boris Litvak <boris.lit...@skf.com> wrote:
>
>> HI Amit,
>>
>>
>>
>> I was thinking along the lines of (python):
>>
>>
>>
>>
>> @udf(returnType=StringType())
>> def reader_udf(filename: str) -> str:
>>     with open(filename, "r") as f:
>>         return f.read()
>>
>>
>> def run_locally():
>>     with utils.build_spark_session("Local", local=True) as spark:
>>         df = spark.readStream.csv(r'testdata', schema
>> =StructType([StructField('filename', StringType(), True)]))
>>         df = df.withColumn('content', reader_udf(col('filename')))
>>         q = df.select('content').writeStream.queryName('test').format(
>> 'console').start()
>>         q.awaitTermination()
>>
>>
>>
>> Now each row contains the contents of the files, provided they are not
>> large you can foreach() over the df/rdd and do whatever you want with it,
>> such as json.loads()/etc.
>>
>> If you know the shema of the jsons, you can later explode() them into a
>> flat DF, ala
>> https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala
>>
>>
>>
>> Note that unless I am missing something you cannot access spark session
>> from foreach as code is not running on the driver.
>>
>> Please say if it makes sense or did I miss anything.
>>
>>
>>
>> Boris
>>
>>
>>
>> *From:* Amit Joshi <mailtojoshia...@gmail.com>
>> *Sent:* Monday, 18 January 2021 17:10
>> *To:* Boris Litvak <boris.lit...@skf.com>
>> *Cc:* spark-user <user@spark.apache.org>
>> *Subject:* Re: [Spark Structured Streaming] Processing the data path
>> coming from kafka.
>>
>>
>>
>> Hi Boris,
>>
>>
>>
>> I need to do processing on the data present in the path.
>>
>> That is the reason I am trying to make the dataframe.
>>
>>
>>
>> Can you please provide the example of your solution?
>>
>>
>>
>> Regards
>>
>> Amit
>>
>>
>>
>> On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak <boris.lit...@skf.com>
>> wrote:
>>
>> Hi Amit,
>>
>>
>>
>> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function
>> that reads the paths?
>>
>> Also, do you really have to read the json into an additional dataframe?
>>
>>
>>
>> Thanks, Boris
>>
>>
>>
>> *From:* Amit Joshi <mailtojoshia...@gmail.com>
>> *Sent:* Monday, 18 January 2021 15:04
>> *To:* spark-user <user@spark.apache.org>
>> *Subject:* [Spark Structured Streaming] Processing the data path coming
>> from kafka.
>>
>>
>>
>> Hi ,
>>
>>
>>
>> I have a use case where the file path of the json records stored in s3
>> are coming as a kafka
>>
>> message in kafka. I have to process the data using spark structured
>> streaming.
>>
>>
>>
>> The design which I thought is as follows:
>>
>> 1. In kafka Spark structures streaming, read the message containing the
>> data path.
>>
>> 2. Collect the message record in driver. (Messages are small in sizes)
>>
>> 3. Create the dataframe from the datalocation.
>>
>>
>>
>> *kafkaDf*.select(*$"value"*.cast(StringType))
>>   .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
>>
>> //rough code
>>
>> //collec to driver
>>
>> *val *records = batchDf.collect()
>>
>> //create dataframe and process
>> records foreach((rec: Row) =>{
>>   *println*(*"records:######################"*,rec.toString())
>>   val path = rec.getAs[String](*"data_path"*)
>>
>>   val dfToProcess =spark.read.json(path)
>>
>>   ....
>>
>> })
>>
>> }
>>
>> I would like to know the views, if this approach is fine? Specifically if 
>> there is some problem with
>>
>> with creating the dataframe after calling collect.
>>
>> If there is any better approach, please let know the same.
>>
>>
>>
>> Regards
>>
>> Amit Joshi
>>
>>

Reply via email to