Re: Insert into table with one the value is derived from DB function using spark

2021-06-20 Thread Mich Talebzadeh
Actually I found a solution to this issue

*Challenge*

Insert data from Spark dataframe when one or more columns in theOracle
table rely on some derived_colums dependent on data in one or more
dataframe columns.

Standard JDBC from Spark to Oracle does batch insert of dataframe into
Oracle *so it cannot handle these derived columns*. Refer below

*dataFrame.* \
write. \
format("jdbc"). \
option("url", url of Oracle). \
*option("dbtable", schema.tableName)*. \
option("user", user). \
option("password", password). \
option("driver", Oracle driver). \
mode(mode). \
*save()*

This writes the whole content of the dataframe to the Oracle table. Cannot
replace  schema.tableName  with INSERT statement

*Possible solution*


   1. Need a cursor based solution. Create a cursor from Spark dataframe.
   So we can walk through every row and get the value of each column from the
   dataframe
   2. Oracle provides the cx_Oracle package.  cx_Oracle
    is a Python extension
   module that enables access to Oracle Database. It conforms to the Python
   database API 2.0 specification
    with a
   considerable number of additions and a couple of exclusions. It is
   maintained by Oracle.
   3. Using cx_Oracle we should be able to create a Connection type to
   Oracle and use Connection.cursor() to deal with rows. See below


This is an example

Create connection to Oracle. Need to install cx_oracle package in PySpark


import cx_Oracle

def loadIntoOracleTableWithCursor(self, df):
  # set Oracle details
  tableName = "randomdata"
fullyQualifiedTableName =
self.config['OracleVariables']['dbschema']+'.'+tableName
user = self.config['OracleVariables']['oracle_user']
password = self.config['OracleVariables']['oracle_password']
serverName = self.config['OracleVariables']['oracleHost']
port = self.config['OracleVariables']['oraclePort']
serviceName = self.config['OracleVariables']['serviceName']
dsn_tns = cx_Oracle.makedsn(serverName, port,
service_name=serviceName)
# create connection conn
conn = cx_Oracle.connect(user, password, dsn_tns)
cursor = conn.cursor()
# df is the dataframe containing the data. Let us build a cursor on
it.

   for row in df.rdd.collect():
# get individual column values from the dataframe
id = row[0]
clustered = row[1]
scattered = row[2]
randomised = row[3]
random_string = row[4]
small_vc = row[5]
padding = row[6]
# Build INSERT/SELECT statement to be executed in Oracle. This
is what we are sending for every row to the Oracle table. Oracle table has
a column called *derived_col *that dataframe does not have it.
  #  That is the one that is derived from some value on
the dataframe column(s). For example here I assign *derived_col = cos(id)*
and pass it in sqlText. You need {} to pass the value and enclose i single
quotes
   #  if the column is character type
sqlText = f"""insert into {fullyQualifiedTableName}
(id,clustered,scattered,randomised,random_string,small_vc,padding,
*derived_col)*
  values
({id},{clustered},{scattered},{randomised},'{random_string}','{small_vc}','{padding}',
*cos({id*}))"""
print(sqlText)
cursor.execute(sqlText)
conn.commit()

Our dataframe has 10 rows and id in Oracle table has been made the primary
key


scratch...@orasource.mich.LOCAL> CREATE TABLE scratchpad.randomdata
  2  (
  3  "ID" NUMBER(*,0),
  4  "CLUSTERED" NUMBER(*,0),
  5  "SCATTERED" NUMBER(*,0),
  6  "RANDOMISED" NUMBER(*,0),
  7  "RANDOM_STRING" VARCHAR2(50 BYTE),
  8  "SMALL_VC" VARCHAR2(50 BYTE),
  9  "PADDING" VARCHAR2(4000 BYTE),
 10  "DERIVED_COL" FLOAT(126)
 11  );

Table created.
scratch...@orasource.mich.LOCAL> ALTER TABLE scratchpad.randomdata ADD
CONSTRAINT randomdata_PK PRIMARY KEY (ID);
Table altered.

Run it and see the output of  print(sqlText)

insert into SCRATCHPAD.randomdata
(id,clustered,scattered,randomised,random_string,small_vc,padding,derived_col)
  values
(1,0.0,0.0,2.0,'KZWeqhFWCEPyYngFbyBMWXaSCrUZoLgubbbPIayRnBUbHoWCFJ','

 
1','xxx',cos(1))

This works fine. It creates the rows and does a commit


In Oracle confirm those 10 rows added starting with id = 1


scratch...@orasource.mich.LOCAL> select count(1) from scratchpad.randomdata;

  COUNT(1)

Re: Insert into table with one the value is derived from DB function using spark

2021-06-19 Thread Sebastian Piu
Another option is to just use plain jdbc (if in java) in a foreachPartition
call on the dataframe/dataset then you get full control of the insert
statement but need to open the connection/transaction yourself

On Sat, 19 Jun 2021 at 19:33, Mich Talebzadeh 
wrote:

> Hi,
>
> I did some research on this.
>
> The only way one can write to Oracle from Spark is through JDBC (excluding
> other options outside of Spark).
>
> The challenge here is that you have a column based on function
> get_function() column  that Spark needs to insert. Currently there is no
> way of inserting records from Park using the traditional INSERT SELECT
> statement. For example this does not work through Spark
>
> scratch...@orasource.mich.LOCAL> insert into scratchpad.dummy6 (id)
> values (2);
>
> The batch insert option seems to be fastest
>
> df.write. \
> format("jdbc"). \
> option("url", oracle_url). \
> option("user", user). \
> option("dbtable", "scratchpad.randomdata"). \  # you
> cannot replace this with sql insert!
> option("password", password). \
> option("driver", driver). \
> mode(mode). \
> save()
>
> How about creating a cursor on DF
>
>  for row in df.rdd.collect():
> id = row[0]
> clustered = row[1]
> scattered = row[2]
> randomised = row[3]
> random_string = row[4]
> small_vc = row[5]
> padding= row[6]
>
> This will print out the individual column values row by row from the
> dataframe but cannot do much about it
>
> The only option I can see here is to create a staging table EXCLUDING the
> derived column and write to that table.
>
> Next go to Oracle itself and do an insert/select from the staging table to
> the target table. Let us create table dumm7 in the image of the one
> created by spark
>
> scratch...@orasource.mich.LOCAL> create table scratchpad.dummy7 as select
> * from scratchpad.randomdata where 1 = 2;
>
> Table created.
>
> Add a new derived column to it, call it derived_col
>
> scratch...@orasource.mich.LOCAL> alter table scratchpad.dummy7 add
> derived_col float;
>
> Table altered.
>
> Now insert/select from scratchpad.randomdata to scratchpad.dummy7. Let us
> populate the new added column with cos(id)
>
> scratch...@orasource.mich.LOCAL> insert into scratchpad.dummy7 (id,
> CLUSTERED, SCATTERED, RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING,
> DERIVED_COL)
>   2  select id, CLUSTERED, SCATTERED, RANDOMISED, RANDOM_STRING, SMALL_VC,
> PADDING, *cos(id)* from randomdata;
>
> 10 rows created.
>
> This should work, unless there is a way of inserting columns directly from
> Spark.
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 18 Jun 2021 at 22:14, Mich Talebzadeh 
> wrote:
>
>> Well the challenge is that Spark is best suited to insert a dataframe
>> into the Oracle table, i.e. a bulk insert
>>
>> that  insert into table (column list) values (..) is a single record
>> insert .. Can you try creating a staging table in oracle without
>> get_function() column and do a bulk insert from Spark dataframe to that
>> staging table?
>>
>> HTH
>>
>> Mich
>>
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 18 Jun 2021 at 21:53, Anshul Kala  wrote:
>>
>>>
>>> Hi Mich,
>>>
>>> Thanks for your reply. Please advise the insert query that I need to
>>> substitute should be like below:
>>>
>>> Insert into table(a,b,c) values(?,get_function_value(?),?)
>>>
>>> In the statement above :
>>>
>>>  ?  : refers to value from dataframe column values
>>> get_function_value : refers to be the function where one of the data
>>> frame column is passed as input
>>>
>>>
>>> Thanks
>>> Anshul
>>>
>>>
>>> Thanks
>>> Anshul
>>>
>>> On Fri, Jun 18, 2021 at 4:29 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 I gather you mean using JDBC to write to the Oracle table?

 Spark provides a unified framework to write to any JDBC
 compliant database.

 def writeTableWithJDBC(dataFrame, url, tableName, user, password,
 driver, mode):
 

Re: Insert into table with one the value is derived from DB function using spark

2021-06-19 Thread Mich Talebzadeh
Hi,

I did some research on this.

The only way one can write to Oracle from Spark is through JDBC (excluding
other options outside of Spark).

The challenge here is that you have a column based on function
get_function() column  that Spark needs to insert. Currently there is no
way of inserting records from Park using the traditional INSERT SELECT
statement. For example this does not work through Spark

scratch...@orasource.mich.LOCAL> insert into scratchpad.dummy6 (id) values
(2);

The batch insert option seems to be fastest

df.write. \
format("jdbc"). \
option("url", oracle_url). \
option("user", user). \
option("dbtable", "scratchpad.randomdata"). \  # you cannot
replace this with sql insert!
option("password", password). \
option("driver", driver). \
mode(mode). \
save()

How about creating a cursor on DF

 for row in df.rdd.collect():
id = row[0]
clustered = row[1]
scattered = row[2]
randomised = row[3]
random_string = row[4]
small_vc = row[5]
padding= row[6]

This will print out the individual column values row by row from the
dataframe but cannot do much about it

The only option I can see here is to create a staging table EXCLUDING the
derived column and write to that table.

Next go to Oracle itself and do an insert/select from the staging table to
the target table. Let us create table dumm7 in the image of the one
created by spark

scratch...@orasource.mich.LOCAL> create table scratchpad.dummy7 as select *
from scratchpad.randomdata where 1 = 2;

Table created.

Add a new derived column to it, call it derived_col

scratch...@orasource.mich.LOCAL> alter table scratchpad.dummy7 add
derived_col float;

Table altered.

Now insert/select from scratchpad.randomdata to scratchpad.dummy7. Let us
populate the new added column with cos(id)

scratch...@orasource.mich.LOCAL> insert into scratchpad.dummy7 (id,
CLUSTERED, SCATTERED, RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING,
DERIVED_COL)
  2  select id, CLUSTERED, SCATTERED, RANDOMISED, RANDOM_STRING, SMALL_VC,
PADDING, *cos(id)* from randomdata;

10 rows created.

This should work, unless there is a way of inserting columns directly from
Spark.

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 18 Jun 2021 at 22:14, Mich Talebzadeh 
wrote:

> Well the challenge is that Spark is best suited to insert a dataframe into
> the Oracle table, i.e. a bulk insert
>
> that  insert into table (column list) values (..) is a single record
> insert .. Can you try creating a staging table in oracle without
> get_function() column and do a bulk insert from Spark dataframe to that
> staging table?
>
> HTH
>
> Mich
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 18 Jun 2021 at 21:53, Anshul Kala  wrote:
>
>>
>> Hi Mich,
>>
>> Thanks for your reply. Please advise the insert query that I need to
>> substitute should be like below:
>>
>> Insert into table(a,b,c) values(?,get_function_value(?),?)
>>
>> In the statement above :
>>
>>  ?  : refers to value from dataframe column values
>> get_function_value : refers to be the function where one of the data
>> frame column is passed as input
>>
>>
>> Thanks
>> Anshul
>>
>>
>> Thanks
>> Anshul
>>
>> On Fri, Jun 18, 2021 at 4:29 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I gather you mean using JDBC to write to the Oracle table?
>>>
>>> Spark provides a unified framework to write to any JDBC
>>> compliant database.
>>>
>>> def writeTableWithJDBC(dataFrame, url, tableName, user, password,
>>> driver, mode):
>>> try:
>>> dataFrame. \
>>> write. \
>>> format("jdbc"). \
>>> option("url", url). \
>>> option("dbtable", tableName). \
>>> option("user", user). \
>>> option("password", password). \
>>> option("driver", driver). \
>>> mode(mode). \
>>> save()
>>> except Exception as e:
>>> print(f"""{e}, quitting""")
>>> sys.exit(1)
>>>
>>> and how to write it
>>>
>>>  

Re: Insert into table with one the value is derived from DB function using spark

2021-06-19 Thread ayan guha
Hi

Why this can be done by oracle insert trigger? Or even a view?

On Sat, 19 Jun 2021 at 7:17 am, Mich Talebzadeh 
wrote:

> Well the challenge is that Spark is best suited to insert a dataframe into
> the Oracle table, i.e. a bulk insert
>
> that  insert into table (column list) values (..) is a single record
> insert .. Can you try creating a staging table in oracle without
> get_function() column and do a bulk insert from Spark dataframe to that
> staging table?
>
> HTH
>
> Mich
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 18 Jun 2021 at 21:53, Anshul Kala  wrote:
>
>>
>> Hi Mich,
>>
>> Thanks for your reply. Please advise the insert query that I need to
>> substitute should be like below:
>>
>> Insert into table(a,b,c) values(?,get_function_value(?),?)
>>
>> In the statement above :
>>
>>  ?  : refers to value from dataframe column values
>> get_function_value : refers to be the function where one of the data
>> frame column is passed as input
>>
>>
>> Thanks
>> Anshul
>>
>>
>> Thanks
>> Anshul
>>
>> On Fri, Jun 18, 2021 at 4:29 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I gather you mean using JDBC to write to the Oracle table?
>>>
>>> Spark provides a unified framework to write to any JDBC
>>> compliant database.
>>>
>>> def writeTableWithJDBC(dataFrame, url, tableName, user, password,
>>> driver, mode):
>>> try:
>>> dataFrame. \
>>> write. \
>>> format("jdbc"). \
>>> option("url", url). \
>>> option("dbtable", tableName). \
>>> option("user", user). \
>>> option("password", password). \
>>> option("driver", driver). \
>>> mode(mode). \
>>> save()
>>> except Exception as e:
>>> print(f"""{e}, quitting""")
>>> sys.exit(1)
>>>
>>> and how to write it
>>>
>>>  def loadIntoOracleTable(self, df2):
>>> # write to Oracle table, all uppercase not mixed case and column
>>> names <= 30 characters in version 12.1
>>> tableName =
>>> self.config['OracleVariables']['yearlyAveragePricesAllTable']
>>> fullyQualifiedTableName =
>>> self.config['OracleVariables']['dbschema']+'.'+tableName
>>> user = self.config['OracleVariables']['oracle_user']
>>> password = self.config['OracleVariables']['oracle_password']
>>> driver = self.config['OracleVariables']['oracle_driver']
>>> mode = self.config['OracleVariables']['mode']
>>>
>>> s.writeTableWithJDBC(df2,oracle_url,fullyQualifiedTableName,user,password,driver,mode)
>>> print(f"""created
>>> {config['OracleVariables']['yearlyAveragePricesAllTable']}""")
>>> # read data to ensure all loaded OK
>>> fetchsize = self.config['OracleVariables']['fetchsize']
>>> read_df =
>>> s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize)
>>> # check that all rows are there
>>> if df2.subtract(read_df).count() == 0:
>>> print("Data has been loaded OK to Oracle table")
>>> else:
>>> print("Data could not be loaded to Oracle table, quitting")
>>> sys.exit(1)
>>>
>>> in the statement where it says
>>>
>>>  option("dbtable", tableName). \
>>>
>>> You can replace *tableName* with the equivalent SQL insert statement
>>>
>>> You will need JDBC driver for Oracle say ojdbc6.jar in
>>> $SPARK_HOME/conf/spark-defaults.conf
>>>
>>> spark.driver.extraClassPath
>>>  /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar
>>>
>>> HTH
>>>
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 18 Jun 2021 at 20:49, Anshul Kala  wrote:
>>>
 Hi All,

 I am using spark to ingest data from file to database Oracle table .
 For one of the fields , the value to be populated is generated from a
 function that is written in database .

 The input to the function is one of the fields of data frame

 I wanted to use spark.dbc.write to perform the operation, which
 generates the insert query at back end .

 For example : It can generate the insert query as :

 

Re: Insert into table with one the value is derived from DB function using spark

2021-06-18 Thread Mich Talebzadeh
Well the challenge is that Spark is best suited to insert a dataframe into
the Oracle table, i.e. a bulk insert

that  insert into table (column list) values (..) is a single record insert
.. Can you try creating a staging table in oracle without get_function()
column and do a bulk insert from Spark dataframe to that staging table?

HTH

Mich




   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 18 Jun 2021 at 21:53, Anshul Kala  wrote:

>
> Hi Mich,
>
> Thanks for your reply. Please advise the insert query that I need to
> substitute should be like below:
>
> Insert into table(a,b,c) values(?,get_function_value(?),?)
>
> In the statement above :
>
>  ?  : refers to value from dataframe column values
> get_function_value : refers to be the function where one of the data frame
> column is passed as input
>
>
> Thanks
> Anshul
>
>
> Thanks
> Anshul
>
> On Fri, Jun 18, 2021 at 4:29 PM Mich Talebzadeh 
> wrote:
>
>> I gather you mean using JDBC to write to the Oracle table?
>>
>> Spark provides a unified framework to write to any JDBC
>> compliant database.
>>
>> def writeTableWithJDBC(dataFrame, url, tableName, user, password, driver,
>> mode):
>> try:
>> dataFrame. \
>> write. \
>> format("jdbc"). \
>> option("url", url). \
>> option("dbtable", tableName). \
>> option("user", user). \
>> option("password", password). \
>> option("driver", driver). \
>> mode(mode). \
>> save()
>> except Exception as e:
>> print(f"""{e}, quitting""")
>> sys.exit(1)
>>
>> and how to write it
>>
>>  def loadIntoOracleTable(self, df2):
>> # write to Oracle table, all uppercase not mixed case and column
>> names <= 30 characters in version 12.1
>> tableName =
>> self.config['OracleVariables']['yearlyAveragePricesAllTable']
>> fullyQualifiedTableName =
>> self.config['OracleVariables']['dbschema']+'.'+tableName
>> user = self.config['OracleVariables']['oracle_user']
>> password = self.config['OracleVariables']['oracle_password']
>> driver = self.config['OracleVariables']['oracle_driver']
>> mode = self.config['OracleVariables']['mode']
>>
>> s.writeTableWithJDBC(df2,oracle_url,fullyQualifiedTableName,user,password,driver,mode)
>> print(f"""created
>> {config['OracleVariables']['yearlyAveragePricesAllTable']}""")
>> # read data to ensure all loaded OK
>> fetchsize = self.config['OracleVariables']['fetchsize']
>> read_df =
>> s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize)
>> # check that all rows are there
>> if df2.subtract(read_df).count() == 0:
>> print("Data has been loaded OK to Oracle table")
>> else:
>> print("Data could not be loaded to Oracle table, quitting")
>> sys.exit(1)
>>
>> in the statement where it says
>>
>>  option("dbtable", tableName). \
>>
>> You can replace *tableName* with the equivalent SQL insert statement
>>
>> You will need JDBC driver for Oracle say ojdbc6.jar in
>> $SPARK_HOME/conf/spark-defaults.conf
>>
>> spark.driver.extraClassPath
>>  /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 18 Jun 2021 at 20:49, Anshul Kala  wrote:
>>
>>> Hi All,
>>>
>>> I am using spark to ingest data from file to database Oracle table . For
>>> one of the fields , the value to be populated is generated from a function
>>> that is written in database .
>>>
>>> The input to the function is one of the fields of data frame
>>>
>>> I wanted to use spark.dbc.write to perform the operation, which
>>> generates the insert query at back end .
>>>
>>> For example : It can generate the insert query as :
>>>
>>> Insert into table values (?,?, getfunctionvalue(?) )
>>>
>>> Please advise if it is possible in spark and if yes , how can it be done
>>>
>>> This is little urgent for me . So any help is appreciated
>>>
>>> Thanks
>>> Anshul
>>>
>>


Re: Insert into table with one the value is derived from DB function using spark

2021-06-18 Thread Anshul Kala
Hi Mich,

Thanks for your reply. Please advise the insert query that I need to
substitute should be like below:

Insert into table(a,b,c) values(?,get_function_value(?),?)

In the statement above :

 ?  : refers to value from dataframe column values
get_function_value : refers to be the function where one of the data frame
column is passed as input


Thanks
Anshul


Thanks
Anshul

On Fri, Jun 18, 2021 at 4:29 PM Mich Talebzadeh 
wrote:

> I gather you mean using JDBC to write to the Oracle table?
>
> Spark provides a unified framework to write to any JDBC compliant database.
>
> def writeTableWithJDBC(dataFrame, url, tableName, user, password, driver,
> mode):
> try:
> dataFrame. \
> write. \
> format("jdbc"). \
> option("url", url). \
> option("dbtable", tableName). \
> option("user", user). \
> option("password", password). \
> option("driver", driver). \
> mode(mode). \
> save()
> except Exception as e:
> print(f"""{e}, quitting""")
> sys.exit(1)
>
> and how to write it
>
>  def loadIntoOracleTable(self, df2):
> # write to Oracle table, all uppercase not mixed case and column
> names <= 30 characters in version 12.1
> tableName =
> self.config['OracleVariables']['yearlyAveragePricesAllTable']
> fullyQualifiedTableName =
> self.config['OracleVariables']['dbschema']+'.'+tableName
> user = self.config['OracleVariables']['oracle_user']
> password = self.config['OracleVariables']['oracle_password']
> driver = self.config['OracleVariables']['oracle_driver']
> mode = self.config['OracleVariables']['mode']
>
> s.writeTableWithJDBC(df2,oracle_url,fullyQualifiedTableName,user,password,driver,mode)
> print(f"""created
> {config['OracleVariables']['yearlyAveragePricesAllTable']}""")
> # read data to ensure all loaded OK
> fetchsize = self.config['OracleVariables']['fetchsize']
> read_df =
> s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize)
> # check that all rows are there
> if df2.subtract(read_df).count() == 0:
> print("Data has been loaded OK to Oracle table")
> else:
> print("Data could not be loaded to Oracle table, quitting")
> sys.exit(1)
>
> in the statement where it says
>
>  option("dbtable", tableName). \
>
> You can replace *tableName* with the equivalent SQL insert statement
>
> You will need JDBC driver for Oracle say ojdbc6.jar in
> $SPARK_HOME/conf/spark-defaults.conf
>
> spark.driver.extraClassPath
>  /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 18 Jun 2021 at 20:49, Anshul Kala  wrote:
>
>> Hi All,
>>
>> I am using spark to ingest data from file to database Oracle table . For
>> one of the fields , the value to be populated is generated from a function
>> that is written in database .
>>
>> The input to the function is one of the fields of data frame
>>
>> I wanted to use spark.dbc.write to perform the operation, which generates
>> the insert query at back end .
>>
>> For example : It can generate the insert query as :
>>
>> Insert into table values (?,?, getfunctionvalue(?) )
>>
>> Please advise if it is possible in spark and if yes , how can it be done
>>
>> This is little urgent for me . So any help is appreciated
>>
>> Thanks
>> Anshul
>>
>


Re: Insert into table with one the value is derived from DB function using spark

2021-06-18 Thread Mich Talebzadeh
I gather you mean using JDBC to write to the Oracle table?

Spark provides a unified framework to write to any JDBC compliant database.

def writeTableWithJDBC(dataFrame, url, tableName, user, password, driver,
mode):
try:
dataFrame. \
write. \
format("jdbc"). \
option("url", url). \
option("dbtable", tableName). \
option("user", user). \
option("password", password). \
option("driver", driver). \
mode(mode). \
save()
except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

and how to write it

 def loadIntoOracleTable(self, df2):
# write to Oracle table, all uppercase not mixed case and column
names <= 30 characters in version 12.1
tableName =
self.config['OracleVariables']['yearlyAveragePricesAllTable']
fullyQualifiedTableName =
self.config['OracleVariables']['dbschema']+'.'+tableName
user = self.config['OracleVariables']['oracle_user']
password = self.config['OracleVariables']['oracle_password']
driver = self.config['OracleVariables']['oracle_driver']
mode = self.config['OracleVariables']['mode']

s.writeTableWithJDBC(df2,oracle_url,fullyQualifiedTableName,user,password,driver,mode)
print(f"""created
{config['OracleVariables']['yearlyAveragePricesAllTable']}""")
# read data to ensure all loaded OK
fetchsize = self.config['OracleVariables']['fetchsize']
read_df =
s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize)
# check that all rows are there
if df2.subtract(read_df).count() == 0:
print("Data has been loaded OK to Oracle table")
else:
print("Data could not be loaded to Oracle table, quitting")
sys.exit(1)

in the statement where it says

 option("dbtable", tableName). \

You can replace *tableName* with the equivalent SQL insert statement

You will need JDBC driver for Oracle say ojdbc6.jar in
$SPARK_HOME/conf/spark-defaults.conf

spark.driver.extraClassPath
 /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 18 Jun 2021 at 20:49, Anshul Kala  wrote:

> Hi All,
>
> I am using spark to ingest data from file to database Oracle table . For
> one of the fields , the value to be populated is generated from a function
> that is written in database .
>
> The input to the function is one of the fields of data frame
>
> I wanted to use spark.dbc.write to perform the operation, which generates
> the insert query at back end .
>
> For example : It can generate the insert query as :
>
> Insert into table values (?,?, getfunctionvalue(?) )
>
> Please advise if it is possible in spark and if yes , how can it be done
>
> This is little urgent for me . So any help is appreciated
>
> Thanks
> Anshul
>


Insert into table with one the value is derived from DB function using spark

2021-06-18 Thread Anshul Kala
Hi All,

I am using spark to ingest data from file to database Oracle table . For
one of the fields , the value to be populated is generated from a function
that is written in database .

The input to the function is one of the fields of data frame

I wanted to use spark.dbc.write to perform the operation, which generates
the insert query at back end .

For example : It can generate the insert query as :

Insert into table values (?,?, getfunctionvalue(?) )

Please advise if it is possible in spark and if yes , how can it be done

This is little urgent for me . So any help is appreciated

Thanks
Anshul