I'm running the following code in an attempt to import some tables from our
Oracle DB into Spark (2.0.2), and then save them as Parquet tables in S3
(using S3A).  The code runs, and does create query-able tables in our Hive
Metastore, but it only creates one connection to Oracle (I was expecting to
get a number of connections equal to "numPartitions" or the number of tasks
the workers can start simultaneously, whichever is less).  Also, it fails if
I am importing a large table (the limit seems to be ~driver memory size, so
I'm guessing that it is putting the table into driver memory and failing if
it doesn't fit).  

This is obviously both slow and inefficient.  Am I doing (or not doing)
something wrong that is forcing this program to run entirely on the driver? 
Or is there something else going on here?  The one Oracle query does seem to
come from one of the workers.  Some small amount of memory usage on the
workers does appear, but they don't seem to be used in any significant way.

I appreciate any insight I can get here, as I was under the impression that
this should use worker resources and run in parallel sort of automagically.

Thanks!


from pyspark import SparkContext,sql, SparkConf
import sys

def main(argv):
    conf = SparkConf()
    conf.setMaster("spark://<spark-master>")
    spark = sql.SparkSession.builder.config(conf=conf).appName('Writing
tables to S3').enableHiveSupport().getOrCreate()
    sc = spark.sparkContext
    lines = []
    for x in sorted(sc.getConf().getAll(), key=lambda x: x[0]):
        print(x)

    with open("tables.txt") as tablenames:
        for line in tablenames:
            lines.append(line.rstrip('\n'))
    for line in lines:
        print("Reading "+line)
        query_min =
spark.read.format("jdbc").option("user","user").option("password","pass").option("dbtable","(select
min("+line+"_ID) from
<dbname>."+line+')').option("url",'<Oracle-url>').load()
        query_min.show()
    
        query_max =
spark.read.format("jdbc").option("user","user").option("password","pass").option("dbtable","(select
max("+line+"_ID) from
<dbname>."+line+')').option("url",'<Oracle-url>').load()
        query_max.show()
    
        data =
spark.read.format("jdbc").option("user",'user').option("password",'pass').option("dbtable",'<dbname>.'+line).option("url",'<Oracle-url>').option("partitionColumn",line+'_ID').option("lowerBound",int(query_min.head()[0])).option("upperBound",int(query_max.head()[0])).option("numPartitions","40").load()
        print("reading "+line)
       
data.write.mode("overwrite").format("parquet").saveAsTable("s3."+line)
        print("done writing "+line)

if __name__ == '__main__':
    main(sys.argv[1:])



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parallel-read-from-OracleDB-slow-fails-on-large-tables-tp28204.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to