Hi Mich, Thanks for your reply. Please advise the insert query that I need to substitute should be like below:
Insert into table(a,b,c) values(?,get_function_value(?),?) In the statement above : ? : refers to value from dataframe column values get_function_value : refers to be the function where one of the data frame column is passed as input Thanks Anshul Thanks Anshul On Fri, Jun 18, 2021 at 4:29 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > I gather you mean using JDBC to write to the Oracle table? > > Spark provides a unified framework to write to any JDBC compliant database. > > def writeTableWithJDBC(dataFrame, url, tableName, user, password, driver, > mode): > try: > dataFrame. \ > write. \ > format("jdbc"). \ > option("url", url). \ > option("dbtable", tableName). \ > option("user", user). \ > option("password", password). \ > option("driver", driver). \ > mode(mode). \ > save() > except Exception as e: > print(f"""{e}, quitting""") > sys.exit(1) > > and how to write it > > def loadIntoOracleTable(self, df2): > # write to Oracle table, all uppercase not mixed case and column > names <= 30 characters in version 12.1 > tableName = > self.config['OracleVariables']['yearlyAveragePricesAllTable'] > fullyQualifiedTableName = > self.config['OracleVariables']['dbschema']+'.'+tableName > user = self.config['OracleVariables']['oracle_user'] > password = self.config['OracleVariables']['oracle_password'] > driver = self.config['OracleVariables']['oracle_driver'] > mode = self.config['OracleVariables']['mode'] > > s.writeTableWithJDBC(df2,oracle_url,fullyQualifiedTableName,user,password,driver,mode) > print(f"""created > {config['OracleVariables']['yearlyAveragePricesAllTable']}""") > # read data to ensure all loaded OK > fetchsize = self.config['OracleVariables']['fetchsize'] > read_df = > s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize) > # check that all rows are there > if df2.subtract(read_df).count() == 0: > print("Data has been loaded OK to Oracle table") > else: > print("Data could not be loaded to Oracle table, quitting") > sys.exit(1) > > in the statement where it says > > option("dbtable", tableName). \ > > You can replace *tableName* with the equivalent SQL insert statement > > You will need JDBC driver for Oracle say ojdbc6.jar in > $SPARK_HOME/conf/spark-defaults.conf > > spark.driver.extraClassPath > /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar > > HTH > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 18 Jun 2021 at 20:49, Anshul Kala <anshul.k...@gmail.com> wrote: > >> Hi All, >> >> I am using spark to ingest data from file to database Oracle table . For >> one of the fields , the value to be populated is generated from a function >> that is written in database . >> >> The input to the function is one of the fields of data frame >> >> I wanted to use spark.dbc.write to perform the operation, which generates >> the insert query at back end . >> >> For example : It can generate the insert query as : >> >> Insert into table values (?,?, getfunctionvalue(?) ) >> >> Please advise if it is possible in spark and if yes , how can it be done >> >> This is little urgent for me . So any help is appreciated >> >> Thanks >> Anshul >> >