Hi Mich,

Thanks for your reply. Please advise the insert query that I need to
substitute should be like below:

Insert into table(a,b,c) values(?,get_function_value(?),?)

In the statement above :

 ?  : refers to value from dataframe column values
get_function_value : refers to be the function where one of the data frame
column is passed as input


Thanks
Anshul


Thanks
Anshul

On Fri, Jun 18, 2021 at 4:29 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> I gather you mean using JDBC to write to the Oracle table?
>
> Spark provides a unified framework to write to any JDBC compliant database.
>
> def writeTableWithJDBC(dataFrame, url, tableName, user, password, driver,
> mode):
>     try:
>         dataFrame. \
>             write. \
>             format("jdbc"). \
>             option("url", url). \
>             option("dbtable", tableName). \
>             option("user", user). \
>             option("password", password). \
>             option("driver", driver). \
>             mode(mode). \
>             save()
>     except Exception as e:
>         print(f"""{e}, quitting""")
>         sys.exit(1)
>
> and how to write it
>
>  def loadIntoOracleTable(self, df2):
>         # write to Oracle table, all uppercase not mixed case and column
> names <= 30 characters in version 12.1
>         tableName =
> self.config['OracleVariables']['yearlyAveragePricesAllTable']
>         fullyQualifiedTableName =
> self.config['OracleVariables']['dbschema']+'.'+tableName
>         user = self.config['OracleVariables']['oracle_user']
>         password = self.config['OracleVariables']['oracle_password']
>         driver = self.config['OracleVariables']['oracle_driver']
>         mode = self.config['OracleVariables']['mode']
>
> s.writeTableWithJDBC(df2,oracle_url,fullyQualifiedTableName,user,password,driver,mode)
>         print(f"""created
> {config['OracleVariables']['yearlyAveragePricesAllTable']}""")
>         # read data to ensure all loaded OK
>         fetchsize = self.config['OracleVariables']['fetchsize']
>         read_df =
> s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize)
>         # check that all rows are there
>         if df2.subtract(read_df).count() == 0:
>             print("Data has been loaded OK to Oracle table")
>         else:
>             print("Data could not be loaded to Oracle table, quitting")
>             sys.exit(1)
>
> in the statement where it says
>
>              option("dbtable", tableName). \
>
> You can replace *tableName* with the equivalent SQL insert statement
>
> You will need JDBC driver for Oracle say ojdbc6.jar in
> $SPARK_HOME/conf/spark-defaults.conf
>
> spark.driver.extraClassPath
>  /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 18 Jun 2021 at 20:49, Anshul Kala <anshul.k...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am using spark to ingest data from file to database Oracle table . For
>> one of the fields , the value to be populated is generated from a function
>> that is written in database .
>>
>> The input to the function is one of the fields of data frame
>>
>> I wanted to use spark.dbc.write to perform the operation, which generates
>> the insert query at back end .
>>
>> For example : It can generate the insert query as :
>>
>> Insert into table values (?,?, getfunctionvalue(?) )
>>
>> Please advise if it is possible in spark and if yes , how can it be done
>>
>> This is little urgent for me . So any help is appreciated
>>
>> Thanks
>> Anshul
>>
>

Reply via email to