Thanks Todd. This is what I did before creating DF on top of that file

var exists = true
exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
if(!exists) {
  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath}
does not exist, aborting!\n")
     sys.exit(1)
}
.
.
def xmlFileExists(hdfsDirectory: String): Boolean = {
   val hadoopConf = new org.apache.hadoop.conf.Configuration()
   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
 }

And checked it. It works.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 5 May 2020 at 17:54, Todd Nist <tsind...@gmail.com> wrote:

> Could you do something like this prior to calling the action.
>
> // Create FileSystem object from Hadoop Configuration
> val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
> // This methods returns Boolean (true - if file exists, false - if file
> doesn't exist
> val fileExists = fs.exists(new Path("<parh_to_file>"))
> if (fileExists) println("File exists!")
> else println("File doesn't exist!")
>
> Not sure that will help you or not, just a thought.
>
> -Todd
>
>
>
>
> On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Thanks  Brandon!
>>
>> i should have remembered that.
>>
>> basically the code gets out with sys.exit(1)  if it cannot find the file
>>
>> I guess there is no easy way of validating DF except actioning it by
>> show(1,0) etc and checking if it works?
>>
>> Regards,
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 5 May 2020 at 16:41, Brandon Geise <brandonge...@gmail.com>
>> wrote:
>>
>>> You could use the Hadoop API and check if the file exists.
>>>
>>>
>>>
>>> *From: *Mich Talebzadeh <mich.talebza...@gmail.com>
>>> *Date: *Tuesday, May 5, 2020 at 11:25 AM
>>> *To: *"user @spark" <user@spark.apache.org>
>>> *Subject: *Exception handling in Spark
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> As I understand exception handling in Spark only makes sense if one
>>> attempts an action as opposed to lazy transformations?
>>>
>>>
>>>
>>> Let us assume that I am reading an XML file from the HDFS directory  and
>>> create a dataframe DF on it
>>>
>>>
>>>
>>> val broadcastValue = "123456789"  // I assume this will be sent as a
>>> constant for the batch
>>>
>>> // Create a DF on top of XML
>>> val df = spark.read.
>>>                 format("com.databricks.spark.xml").
>>>                 option("rootTag", "hierarchy").
>>>                 option("rowTag", "sms_request").
>>>                 load("/tmp/broadcast.xml")
>>>
>>> val newDF = df.withColumn("broadcastid", lit(broadcastValue))
>>>
>>> newDF.createOrReplaceTempView("tmp")
>>>
>>>   // Put data in Hive table
>>>   //
>>>   sqltext = """
>>>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION
>>> (broadcastid="123456", brand)
>>>   SELECT
>>>           ocis_party_id AS partyId
>>>         , target_mobile_no AS phoneNumber
>>>         , brand
>>>         , broadcastid
>>>   FROM tmp
>>>   """
>>> //
>>>
>>> // Here I am performing a collection
>>>
>>> try  {
>>>
>>>          spark.sql(sqltext)
>>>
>>> } catch {
>>>
>>>     case e: SQLException => e.printStackTrace
>>>
>>>     sys.exit()
>>>
>>> }
>>>
>>>
>>>
>>> Now the issue I have is that what if the xml file  /tmp/broadcast.xml
>>> does not exist or deleted? I won't be able to catch the error until the
>>> hive table is populated. Of course I can write a shell script to check if
>>> the file exist before running the job or put small collection like
>>> df.show(1,0). Are there more general alternatives?
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn  
>>> *https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>

Reply via email to